From 9fbb7f50d97c85f5e28947a9b4dbe71effba8d6e Mon Sep 17 00:00:00 2001 From: Roy Razon Date: Tue, 1 Aug 2023 17:57:54 +0300 Subject: [PATCH] cta: docker API (#149) * compose tunnel agent: add docker API - passthrough API to docker on port 3001 - websocket impl for exec and logs * add option to show preevy_proxy url * tunnel-server: fix error handling * tunnel-server: fix ws auth --- packages/cli-common/src/lib/flags.ts | 4 + packages/cli/src/commands/up.ts | 1 + packages/cli/src/commands/urls.ts | 1 + packages/compose-tunnel-agent/Dockerfile.dev | 3 + .../docker-compose.override.yml | 3 +- .../compose-tunnel-agent/docker-compose.yml | 1 + packages/compose-tunnel-agent/index.ts | 71 +++-- packages/compose-tunnel-agent/jest.config.js | 5 + packages/compose-tunnel-agent/package.json | 15 +- packages/compose-tunnel-agent/src/docker.ts | 1 + .../src/http/api-server.ts | 25 ++ .../src/http/docker-proxy/index.test.ts | 250 ++++++++++++++++++ .../src/http/docker-proxy/index.ts | 77 ++++++ .../src/http/docker-proxy/ws/handler.ts | 32 +++ .../src/http/docker-proxy/ws/handlers/exec.ts | 42 +++ .../src/http/docker-proxy/ws/handlers/logs.ts | 39 +++ .../src/http/docker-proxy/ws/index.ts | 10 + .../src/http/http-server-helpers.ts | 117 ++++++++ .../compose-tunnel-agent/src/http/index.ts | 35 +++ .../src/http/query-params.ts | 20 ++ packages/compose-tunnel-agent/src/web.ts | 45 ---- packages/core/src/commands/urls.ts | 3 + .../core/src/compose-tunnel-agent-client.ts | 23 +- tunnel-server/index.ts | 10 +- tunnel-server/src/app.ts | 44 +-- tunnel-server/src/auth.ts | 12 + tunnel-server/src/errors.ts | 32 --- tunnel-server/src/http-server-helpers.ts | 117 ++++++++ tunnel-server/src/proxy.ts | 120 ++++----- tunnel-server/src/session.ts | 14 +- tunnel-server/src/ssh-server.ts | 4 +- yarn.lock | 14 +- 32 files changed, 989 insertions(+), 201 deletions(-) create mode 100644 packages/compose-tunnel-agent/Dockerfile.dev create mode 100644 packages/compose-tunnel-agent/jest.config.js create mode 100644 packages/compose-tunnel-agent/src/http/api-server.ts create mode 100644 packages/compose-tunnel-agent/src/http/docker-proxy/index.test.ts create mode 100644 packages/compose-tunnel-agent/src/http/docker-proxy/index.ts create mode 100644 packages/compose-tunnel-agent/src/http/docker-proxy/ws/handler.ts create mode 100644 packages/compose-tunnel-agent/src/http/docker-proxy/ws/handlers/exec.ts create mode 100644 packages/compose-tunnel-agent/src/http/docker-proxy/ws/handlers/logs.ts create mode 100644 packages/compose-tunnel-agent/src/http/docker-proxy/ws/index.ts create mode 100644 packages/compose-tunnel-agent/src/http/http-server-helpers.ts create mode 100644 packages/compose-tunnel-agent/src/http/index.ts create mode 100644 packages/compose-tunnel-agent/src/http/query-params.ts delete mode 100644 packages/compose-tunnel-agent/src/web.ts delete mode 100644 tunnel-server/src/errors.ts create mode 100644 tunnel-server/src/http-server-helpers.ts diff --git a/packages/cli-common/src/lib/flags.ts b/packages/cli-common/src/lib/flags.ts index 82432505..aa74799e 100644 --- a/packages/cli-common/src/lib/flags.ts +++ b/packages/cli-common/src/lib/flags.ts @@ -52,4 +52,8 @@ export const urlFlags = { description: 'Include access credentials for basic auth for each service URL', default: false, }), + 'show-preevy-service-urls': Flags.boolean({ + description: 'Show URLs for internal Preevy services', + default: false, + }), } as const diff --git a/packages/cli/src/commands/up.ts b/packages/cli/src/commands/up.ts index 77c8f70a..39cbf507 100644 --- a/packages/cli/src/commands/up.ts +++ b/packages/cli/src/commands/up.ts @@ -91,6 +91,7 @@ export default class Up extends MachineCreationDriverCommand { envId, tunnelingKey, includeAccessCredentials: flags['include-access-credentials'], + showPreevyService: flags['show-preevy-service-urls'], retryOpts: { minTimeout: 1000, maxTimeout: 2000, diff --git a/packages/cli/src/commands/urls.ts b/packages/cli/src/commands/urls.ts index 64ff0f86..d695d68e 100644 --- a/packages/cli/src/commands/urls.ts +++ b/packages/cli/src/commands/urls.ts @@ -88,6 +88,7 @@ export default class Urls extends ProfileCommand { serviceAndPort: args.service ? { service: args.service, port: args.port } : undefined, tunnelingKey, includeAccessCredentials: flags['include-access-credentials'], + showPreevyService: flags['show-preevy-service-urls'], retryOpts: { retries: 2 }, }) diff --git a/packages/compose-tunnel-agent/Dockerfile.dev b/packages/compose-tunnel-agent/Dockerfile.dev new file mode 100644 index 00000000..be1f7a6c --- /dev/null +++ b/packages/compose-tunnel-agent/Dockerfile.dev @@ -0,0 +1,3 @@ +FROM node:18-alpine as development +WORKDIR /app +CMD [ "yarn", "-s", "dev" ] diff --git a/packages/compose-tunnel-agent/docker-compose.override.yml b/packages/compose-tunnel-agent/docker-compose.override.yml index 7686d905..d4852a01 100644 --- a/packages/compose-tunnel-agent/docker-compose.override.yml +++ b/packages/compose-tunnel-agent/docker-compose.override.yml @@ -4,8 +4,7 @@ services: preevy_proxy: build: context: . - target: development + dockerfile: Dockerfile.dev volumes: - ${HOME}/.ssh:/root/.ssh - diff --git a/packages/compose-tunnel-agent/docker-compose.yml b/packages/compose-tunnel-agent/docker-compose.yml index 09f8af62..47f21786 100644 --- a/packages/compose-tunnel-agent/docker-compose.yml +++ b/packages/compose-tunnel-agent/docker-compose.yml @@ -17,6 +17,7 @@ services: ports: - 3000 + - 3001 # healthcheck: # test: wget --no-verbose --tries=1 --spider http://localhost:3000/healthz || exit 1 diff --git a/packages/compose-tunnel-agent/index.ts b/packages/compose-tunnel-agent/index.ts index 96706deb..a5253580 100644 --- a/packages/compose-tunnel-agent/index.ts +++ b/packages/compose-tunnel-agent/index.ts @@ -2,16 +2,21 @@ import fs from 'fs' import path from 'path' import Docker from 'dockerode' import { inspect } from 'node:util' +import http from 'node:http' import { rimraf } from 'rimraf' import pino from 'pino' import pinoPretty from 'pino-pretty' import { EOL } from 'os' import { ConnectionCheckResult, requiredEnv, checkConnection, formatPublicKey, parseSshUrl, SshConnectionConfig, tunnelNameResolver } from '@preevy/common' import createDockerClient from './src/docker' -import createWebServer from './src/web' +import createApiServerHandler from './src/http/api-server' import { sshClient as createSshClient } from './src/ssh' +import { createDockerProxyHandlers } from './src/http/docker-proxy' +import { tryHandler, tryUpgradeHandler } from './src/http/http-server-helpers' +import { httpServerHandlers } from './src/http' const homeDir = process.env.HOME || '/root' +const dockerSocket = '/var/run/docker.sock' const readDir = async (dir: string) => { try { @@ -70,11 +75,11 @@ const formatConnectionCheckResult = ( const writeLineToStdout = (s: string) => [s, EOL].forEach(d => process.stdout.write(d)) -const main = async () => { - const log = pino({ - level: process.env.DEBUG || process.env.DOCKER_PROXY_DEBUG ? 'debug' : 'info', - }, pinoPretty({ destination: pino.destination(process.stderr) })) +const log = pino({ + level: process.env.DEBUG || process.env.DOCKER_PROXY_DEBUG ? 'debug' : 'info', +}, pinoPretty({ destination: pino.destination(process.stderr) })) +const main = async () => { const { connectionConfig, sshUrl } = await sshConnectionConfigFromEnv() log.debug('ssh config: %j', { @@ -92,20 +97,21 @@ const main = async () => { process.exit(0) } - const docker = new Docker({ socketPath: '/var/run/docker.sock' }) + const docker = new Docker({ socketPath: dockerSocket }) const dockerClient = createDockerClient({ log: log.child({ name: 'docker' }), docker, debounceWait: 500 }) + const sshLog = log.child({ name: 'ssh' }) const sshClient = await createSshClient({ connectionConfig, tunnelNameResolver: tunnelNameResolver({ userDefinedSuffix: process.env.TUNNEL_URL_SUFFIX }), - log: log.child({ name: 'ssh' }), + log: sshLog, onError: err => { log.error(err) process.exit(1) }, }) - log.info('ssh client connected to %j', sshUrl) + sshLog.info('ssh client connected to %j', sshUrl) let currentTunnels = dockerClient.getRunningServices().then(services => sshClient.updateTunnels(services)) void dockerClient.startListening({ @@ -115,25 +121,50 @@ const main = async () => { }, }) - const listenAddress = process.env.PORT ?? 3000 - if (typeof listenAddress === 'string' && Number.isNaN(Number(listenAddress))) { - await rimraf(listenAddress) + const apiListenAddress = process.env.PORT ?? 3000 + if (typeof apiListenAddress === 'string' && Number.isNaN(Number(apiListenAddress))) { + await rimraf(apiListenAddress) } - const webServer = createWebServer({ - log: log.child({ name: 'web' }), - currentSshState: async () => ( - await currentTunnels - ), + const { handler, upgradeHandler } = httpServerHandlers({ + log: log.child({ name: 'http' }), + apiHandler: createApiServerHandler({ + log: log.child({ name: 'api' }), + currentSshState: async () => (await currentTunnels), + }), + dockerProxyHandlers: createDockerProxyHandlers({ + log: log.child({ name: 'docker-proxy' }), + dockerSocket, + docker, + }), + dockerProxyPrefix: '/docker/', }) - .listen(listenAddress, () => { - log.info(`listening on ${inspect(webServer.address())}`) + + const httpLog = log.child({ name: 'http' }) + + const httpServer = http.createServer(tryHandler({ log: httpLog }, async (req, res) => { + httpLog.debug('request %s %s', req.method, req.url) + return await handler(req, res) + })) + .on('upgrade', tryUpgradeHandler({ log: httpLog }, async (req, socket, head) => { + httpLog.debug('upgrade %s %s', req.method, req.url) + return await upgradeHandler(req, socket, head) + })) + .listen(apiListenAddress, () => { + httpLog.info(`API server listening on ${inspect(httpServer.address())}`) }) .on('error', err => { - log.error(err) + httpLog.error(err) process.exit(1) }) .unref() } -void main() +void main(); + +['SIGTERM', 'SIGINT'].forEach(signal => { + process.once(signal, async () => { + log.info(`shutting down on ${signal}`) + process.exit(0) + }) +}) diff --git a/packages/compose-tunnel-agent/jest.config.js b/packages/compose-tunnel-agent/jest.config.js new file mode 100644 index 00000000..b413e106 --- /dev/null +++ b/packages/compose-tunnel-agent/jest.config.js @@ -0,0 +1,5 @@ +/** @type {import('ts-jest').JestConfigWithTsJest} */ +module.exports = { + preset: 'ts-jest', + testEnvironment: 'node', +}; \ No newline at end of file diff --git a/packages/compose-tunnel-agent/package.json b/packages/compose-tunnel-agent/package.json index dcab1ad6..98dc9d69 100644 --- a/packages/compose-tunnel-agent/package.json +++ b/packages/compose-tunnel-agent/package.json @@ -17,12 +17,16 @@ "pino": "^8.11.0", "pino-pretty": "^9.4.0", "rimraf": "^5.0.0", - "ssh2": "^1.12.0" + "ssh2": "^1.12.0", + "ws": "^8.13.0" }, "devDependencies": { + "@jest/globals": "^29.5.0", "@types/dockerode": "^3.3.14", + "@types/http-proxy": "^1.17.9", "@types/lodash": "^4.14.192", "@types/node": "18", + "@types/node-fetch": "^2.6.3", "@types/shell-escape": "^0.2.1", "@types/ssh2": "^1.11.8", "@typescript-eslint/eslint-plugin": "^5.55.0", @@ -30,18 +34,21 @@ "esbuild": "^0.17.14", "eslint": "^8.36.0", "husky": "^8.0.0", + "jest": "^29.4.3", "lint-staged": "^13.1.2", + "node-fetch": "2.6.9", "tsx": "^3.12.3", - "typescript": "^5.0.4" + "typescript": "^5.0.4", + "wait-for-expect": "^3.0.2" }, "scripts": { "start": "node out/index.js", "dev": "tsx watch ./index.ts", "lint": "eslint . --ext .ts,.tsx --cache", "clean": "rm -rf dist out", - "build": "node --version && node build.mjs", + "build": "yarn tsc --noEmit && node build.mjs", "prepack": "yarn build", "prepare": "cd ../.. && husky install", - "bump-to": "yarn version --no-commit-hooks --no-git-tag-version --new-version" + "test": "yarn jest" } } diff --git a/packages/compose-tunnel-agent/src/docker.ts b/packages/compose-tunnel-agent/src/docker.ts index 5eda239f..cfce3a42 100644 --- a/packages/compose-tunnel-agent/src/docker.ts +++ b/packages/compose-tunnel-agent/src/docker.ts @@ -60,6 +60,7 @@ const client = ({ stream.on('data', handler) log.info('listening on docker') void handler() + return { close: () => stream.removeAllListeners() } }, } } diff --git a/packages/compose-tunnel-agent/src/http/api-server.ts b/packages/compose-tunnel-agent/src/http/api-server.ts new file mode 100644 index 00000000..df1f7abd --- /dev/null +++ b/packages/compose-tunnel-agent/src/http/api-server.ts @@ -0,0 +1,25 @@ +import url from 'node:url' +import { Logger } from '@preevy/common' +import { SshState } from '../ssh' +import { NotFoundError, respondAccordingToAccept, respondJson, tryHandler } from './http-server-helpers' + +const createApiServerHandler = ({ log, currentSshState }: { + log: Logger + currentSshState: ()=> Promise +}) => tryHandler({ log }, async (req, res) => { + const { pathname: path } = url.parse(req.url || '') + + if (path === '/tunnels') { + respondJson(res, await currentSshState()) + return + } + + if (path === '/healthz') { + respondAccordingToAccept(req, res, 'OK') + return + } + + throw new NotFoundError() +}) + +export default createApiServerHandler diff --git a/packages/compose-tunnel-agent/src/http/docker-proxy/index.test.ts b/packages/compose-tunnel-agent/src/http/docker-proxy/index.test.ts new file mode 100644 index 00000000..371140de --- /dev/null +++ b/packages/compose-tunnel-agent/src/http/docker-proxy/index.test.ts @@ -0,0 +1,250 @@ +import http from 'node:http' +import net from 'node:net' +import { describe, expect, beforeAll, afterAll, test, jest, it } from '@jest/globals' +import { ChildProcess, spawn, exec } from 'child_process' +import pino from 'pino' +import pinoPretty from 'pino-pretty' +import Dockerode from 'dockerode' +import fetch from 'node-fetch' +import { inspect, promisify } from 'node:util' +import waitForExpect from 'wait-for-expect' +import WebSocket from 'ws' +import { createDockerProxyHandlers } from '.' + +const setupDockerContainer = () => { + let dockerProcess: ChildProcess + let containerName: string + let output: Buffer[] + jest.setTimeout(100000) + + beforeAll(() => { + containerName = `test-docker-proxy-${Math.random().toString(36).substring(2, 9)}` + output = [] + dockerProcess = spawn( + 'docker', + [ + ...`run --rm --name ${containerName} busybox sh -c`.split(' '), + 'while true; do echo "hello stdout"; >&2 echo "hello stderr"; sleep 0.1; done', + ] + ) + dockerProcess.stdout?.on('data', data => { output.push(data) }) + dockerProcess.stderr?.on('data', data => { output.push(data) }) + return new Promise((resolve, reject) => { + dockerProcess.stdout?.once('data', () => { resolve() }) + dockerProcess.once('error', reject) + dockerProcess.once('exit', (code, signal) => { + const outStr = Buffer.concat(output).toString('utf-8') + reject(new Error(`docker exited with code ${code} and signal ${signal}: ${outStr}`)) + }) + }) + }) + + afterAll(async () => { + dockerProcess.kill() + await promisify(exec)(`docker rm -f ${containerName}`) + }) + + return { + containerName: () => containerName, + } +} + +const setupDockerProxy = () => { + const log = pino({ + level: 'debug', + }, pinoPretty({ destination: pino.destination(process.stderr) })) + + let server: http.Server + let serverBaseUrl: string + + beforeAll(async () => { + const docker = new Dockerode() + const handlers = createDockerProxyHandlers({ log, docker, dockerSocket: '/var/run/docker.sock' }) + server = http.createServer(handlers.handler).on('upgrade', handlers.upgradeHandler) + + const serverPort = await new Promise(resolve => { + server.listen(0, () => { + resolve((server.address() as net.AddressInfo).port) + }) + }) + serverBaseUrl = `localhost:${serverPort}` + }) + + afterAll(async () => { + await promisify(server.close.bind(server))() + }) + + return { + serverBaseUrl: () => serverBaseUrl, + } +} + +const fetchJson = async (...args: Parameters) => { + const r = await fetch(...args) + if (!r.ok) { + throw new Error(`Fetch ${inspect(args)} failed: ${r.status} ${r.statusText}: ${await r.text()}`) + } + return await r.json() +} + +type OpenWebSocket = { + ws: WebSocket + receivedBuffers: Buffer[] + close: () => Promise + send: (data: string | Buffer) => Promise +} + +const openWebSocket = (url: string) => new Promise((resolve, reject) => { + const receivedBuffers: Buffer[] = [] + new WebSocket(url) + .on('error', reject) + .on('message', data => { + if (Buffer.isBuffer(data)) { + receivedBuffers.push(data) + } else if (Array.isArray(data)) { + receivedBuffers.push(...data) + } else { + receivedBuffers.push(Buffer.from(data)) + } + }) + .on('open', function onOpen() { + resolve({ + ws: this, + receivedBuffers, + close: () => new Promise(resolveClose => { + this.close() + this.once('close', () => { resolveClose() }) + }), + send: promisify(this.send.bind(this)), + }) + }) +}) + +describe('docker proxy', () => { + const { containerName } = setupDockerContainer() + const { serverBaseUrl } = setupDockerProxy() + + const waitForContainerId = async () => { + let containerId = '' + await waitForExpect(async () => { + const containers = await fetchJson(`http://${serverBaseUrl()}/containers/json`) as { Id: string; Names: string[] }[] + const container = containers.find(({ Names: names }) => names.includes(`/${containerName()}`)) + expect(container).toBeDefined() + containerId = container?.Id as string + }, 3000, 100) + return containerId + } + + test('use the docker API', async () => { + expect(await waitForContainerId()).toBeDefined() + }) + + describe('exec', () => { + const createExec = async (containerId: string, tty: boolean) => { + const { Id: execId } = await fetchJson(`http://${serverBaseUrl()}/containers/${containerId}/exec`, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + }, + body: JSON.stringify({ + AttachStdin: true, + AttachStdout: true, + AttachStderr: true, + Tty: tty, + Cmd: ['sh'], + }), + }) + + return execId + } + + let execId: string + let containerId: string + + beforeAll(async () => { + containerId = await waitForContainerId() + }) + + describe('tty=true', () => { + beforeAll(async () => { + execId = await createExec(containerId, true) + }) + + it('should communicate via websocket', async () => { + const { receivedBuffers, send, close } = await openWebSocket(`ws://${serverBaseUrl()}/exec/${execId}/start`) + await waitForExpect(() => expect(receivedBuffers.length).toBeGreaterThan(0)) + await send('ls\n') + await waitForExpect(() => { + const received = Buffer.concat(receivedBuffers).toString('utf-8') + expect(received).toContain('#') + expect(received).toContain('ls') + expect(received).toContain('bin') + }) + await close() + }) + }) + + describe('tty=false', () => { + beforeAll(async () => { + execId = await createExec(containerId, false) + }) + + it('should communicate via websocket', async () => { + const { receivedBuffers, send, close } = await openWebSocket(`ws://${serverBaseUrl()}/exec/${execId}/start`) + await waitForExpect(async () => { + await send('ls\n') + const received = Buffer.concat(receivedBuffers).toString('utf-8') + expect(received).toContain('bin') + }) + await close() + }) + }) + }) + + describe('logs', () => { + let containerId: string + beforeAll(async () => { + containerId = await waitForContainerId() + }) + + const logStreams = ['stdout', 'stderr'] as const + type LogStream = typeof logStreams[number] + + const testStream = (...s: LogStream[]) => { + describe(`${s.join(' and ')}`, () => { + it(`should show the ${s.join(' and ')} logs via websocket`, async () => { + const { receivedBuffers, close } = await openWebSocket(`ws://${serverBaseUrl()}/containers/${containerId}/logs?${s.map(st => `${st}=true`).join('&')}`) + await waitForExpect(() => expect(receivedBuffers.length).toBeGreaterThan(0)) + const length1 = receivedBuffers.length + await waitForExpect(() => { + const received = Buffer.concat(receivedBuffers).toString('utf-8') + s.forEach(st => { + expect(received).toContain(`hello ${st}`) + }) + logStreams.filter(st => !s.includes(st)).forEach(st => { + expect(received).not.toContain(`hello ${st}`) + }) + }) + await waitForExpect(() => { + expect(receivedBuffers.length).toBeGreaterThan(length1) + }) + await close() + }) + }) + } + + testStream('stdout') + testStream('stderr') + testStream('stdout', 'stderr') + + describe('timestamps', () => { + it('should show the logs with a timestamp', async () => { + const { receivedBuffers, close } = await openWebSocket(`ws://${serverBaseUrl()}/containers/${containerId}/logs?stdout=true×tamps=true`) + await waitForExpect(() => expect(receivedBuffers.length).toBeGreaterThan(0)) + const received = Buffer.concat(receivedBuffers).toString('utf-8') + expect(received).toMatch(/^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}.\d*Z/) + await close() + }) + }) + }) +}) diff --git a/packages/compose-tunnel-agent/src/http/docker-proxy/index.ts b/packages/compose-tunnel-agent/src/http/docker-proxy/index.ts new file mode 100644 index 00000000..b6e5a554 --- /dev/null +++ b/packages/compose-tunnel-agent/src/http/docker-proxy/index.ts @@ -0,0 +1,77 @@ +import net from 'node:net' +import HttpProxy from 'http-proxy' +import { Logger } from 'pino' +import { inspect } from 'node:util' +import { WebSocketServer } from 'ws' +import Dockerode from 'dockerode' +import { findHandler, handlers as wsHandlers } from './ws' +import { tryHandler, tryUpgradeHandler } from '../http-server-helpers' + +export const createDockerProxyHandlers = ( + { log, dockerSocket, docker }: { + log: Logger + dockerSocket: string + docker: Dockerode + }, +) => { + const proxy = new HttpProxy({ + // eslint-disable-next-line @typescript-eslint/ban-ts-comment + // @ts-ignore + target: { + socketPath: dockerSocket, + }, + }) + + const wss = new WebSocketServer({ noServer: true }) + + wss.on('connection', async (ws, req) => { + const foundHandler = findHandler(wsHandlers, req) + if (!foundHandler) { + ws.close(404, 'Not found') + return undefined + } + + await foundHandler.handler.handler(ws, req, foundHandler.match, { log, docker }) + return undefined + }) + + const handler = tryHandler({ log }, async (req, res) => { + proxy.web(req, res) + }) + + const upgradeHandler = tryUpgradeHandler({ log }, async (req, socket, head) => { + const upgrade = req.headers.upgrade?.toLowerCase() + + if (upgrade === 'websocket') { + if (findHandler(wsHandlers, req)) { + wss.handleUpgrade(req, socket, head, client => { + wss.emit('connection', client, req) + }) + return undefined + } + + proxy.ws(req, socket, head, {}, err => { + log.warn('error in ws proxy %j', inspect(err)) + }) + return undefined + } + + if (upgrade === 'tcp') { + const targetSocket = net.createConnection({ path: dockerSocket }, () => { + const reqBuf = `${req.method} ${req.url} HTTP/${req.httpVersion}\r\n${Object.entries(req.headers).map(([k, v]) => `${k}: ${v}`).join('\r\n')}\r\n\r\n` + targetSocket.write(reqBuf) + targetSocket.write(head) + socket.pipe(targetSocket).pipe(socket) + }) + return undefined + } + + log.warn('invalid upgrade %s', upgrade) + socket.end(`Invalid upgrade ${upgrade}`) + return undefined + }) + + return { handler, upgradeHandler } +} + +export type DockerProxyHandlers = ReturnType diff --git a/packages/compose-tunnel-agent/src/http/docker-proxy/ws/handler.ts b/packages/compose-tunnel-agent/src/http/docker-proxy/ws/handler.ts new file mode 100644 index 00000000..d3d3a845 --- /dev/null +++ b/packages/compose-tunnel-agent/src/http/docker-proxy/ws/handler.ts @@ -0,0 +1,32 @@ +import http from 'node:http' +import { Logger } from 'pino' +import WebSocket from 'ws' +import Dockerode from 'dockerode' + +type Context = { log: Logger; docker: Dockerode } +export type WsHandlerFunc = ( + ws: WebSocket, + req: http.IncomingMessage, + match: RegExpMatchArray, + { log, docker }: Context, +) => Promise + +export type WsHandler = { + matchRequest: RegExp + handler: WsHandlerFunc +} + +export const wsHandler = ( + matchRequest: RegExp, + handler: WsHandlerFunc +) => ({ matchRequest, handler }) + +export const findHandler = (handlers: WsHandler[], req: http.IncomingMessage) => { + for (const handler of handlers) { + const match = handler.matchRequest.exec(req.url ?? '') + if (match) { + return { handler, match } + } + } + return undefined +} diff --git a/packages/compose-tunnel-agent/src/http/docker-proxy/ws/handlers/exec.ts b/packages/compose-tunnel-agent/src/http/docker-proxy/ws/handlers/exec.ts new file mode 100644 index 00000000..0aae4705 --- /dev/null +++ b/packages/compose-tunnel-agent/src/http/docker-proxy/ws/handlers/exec.ts @@ -0,0 +1,42 @@ +import { inspect } from 'util' +import { createWebSocketStream } from 'ws' +import { parseQueryParams, queryParamBoolean } from '../../../query-params' +import { wsHandler } from '../handler' + +const handler = wsHandler( + /^\/exec\/([^/?]+)\/start($|\?)/, + async (ws, req, match, { log, docker }) => { + const id = match[1] + const { tty } = parseQueryParams(req.url ?? '') + const exec = docker.getExec(id) + const execStream = await exec.start({ + hijack: true, + stdin: true, + ...(tty !== undefined ? { Tty: queryParamBoolean(tty) } : {}), + }) + + execStream.on('close', () => { ws.close() }) + execStream.on('error', err => { log.warn('execStream error %j', inspect(err)) }) + ws.on('close', () => { execStream.destroy() }) + + const inspectResults = await exec.inspect() + log.debug('exec %s: %j', id, inspect(inspectResults)) + + const wsStream = createWebSocketStream(ws) + wsStream.on('error', err => { + const level = err.message.includes('WebSocket is not open') ? 'debug' : 'warn' + log[level]('wsStream error %j', inspect(err)) + }) + + if (inspectResults.ProcessConfig.tty) { + execStream.pipe(wsStream, { end: false }).pipe(execStream) + } else { + docker.modem.demuxStream(execStream, wsStream, wsStream) + wsStream.pipe(execStream) + } + + return undefined + }, +) + +export default handler diff --git a/packages/compose-tunnel-agent/src/http/docker-proxy/ws/handlers/logs.ts b/packages/compose-tunnel-agent/src/http/docker-proxy/ws/handlers/logs.ts new file mode 100644 index 00000000..ba96e6ae --- /dev/null +++ b/packages/compose-tunnel-agent/src/http/docker-proxy/ws/handlers/logs.ts @@ -0,0 +1,39 @@ +import { inspect } from 'util' +import { createWebSocketStream } from 'ws' +import { parseQueryParams, queryParamBoolean } from '../../../query-params' +import { wsHandler } from '../handler' + +const handler = wsHandler( + /^\/containers\/([^/?]+)\/logs($|\?)/, + async (ws, req, match, { log, docker }) => { + const id = match[1] + const { stdout, stderr, since, until, timestamps, tail } = parseQueryParams(req.url ?? '') + const abort = new AbortController() + const logStream = await docker.getContainer(id).logs({ + stdout: queryParamBoolean(stdout), + stderr: queryParamBoolean(stderr), + since, + until, + timestamps: queryParamBoolean(timestamps), + tail: tail !== undefined ? Number(tail) : undefined, + follow: true, + abortSignal: abort.signal, + }) + + logStream.on('close', async () => { ws.close() }) + logStream.on('error', err => { + if (err.message !== 'aborted') { + log.error('logs stream error %j', inspect(err)) + } + }) + ws.on('close', () => { abort.abort() }) + + const wsStream = createWebSocketStream(ws) + wsStream.on('error', err => { log.error('wsStream error %j', inspect(err)) }) + docker.modem.demuxStream(logStream, wsStream, wsStream) + + return undefined + }, +) + +export default handler diff --git a/packages/compose-tunnel-agent/src/http/docker-proxy/ws/index.ts b/packages/compose-tunnel-agent/src/http/docker-proxy/ws/index.ts new file mode 100644 index 00000000..c3acb87a --- /dev/null +++ b/packages/compose-tunnel-agent/src/http/docker-proxy/ws/index.ts @@ -0,0 +1,10 @@ +import { WsHandler } from './handler' +import exec from './handlers/exec' +import logs from './handlers/logs' + +export const handlers: WsHandler[] = [ + exec, + logs, +] + +export { findHandler } from './handler' diff --git a/packages/compose-tunnel-agent/src/http/http-server-helpers.ts b/packages/compose-tunnel-agent/src/http/http-server-helpers.ts new file mode 100644 index 00000000..a1023de2 --- /dev/null +++ b/packages/compose-tunnel-agent/src/http/http-server-helpers.ts @@ -0,0 +1,117 @@ +import { Logger } from '@preevy/common' +import http from 'node:http' +import stream from 'node:stream' +import { inspect } from 'node:util' + +export const respond = (res: http.ServerResponse, content: string, type = 'text/plain', status = 200) => { + res.writeHead(status, { 'Content-Type': type }) + res.end(content) +} + +export const respondJson = ( + res: http.ServerResponse, + content: unknown, + status = 200, +) => respond(res, JSON.stringify(content), 'application/json', status) + +export const respondAccordingToAccept = ( + req: http.IncomingMessage, + res: http.ServerResponse, + message: string, + status = 200, +) => (req.headers.accept?.toLowerCase().includes('json') + ? respondJson(res, { message }, status) + : respond(res, message, 'text/plain', status)) + +export class HttpError extends Error { + constructor( + readonly status: number, + readonly clientMessage: string, + readonly cause?: unknown, + readonly responseHeaders?: Record + ) { + super(clientMessage) + } +} + +export class NotFoundError extends HttpError { + static defaultMessage = 'Not found' + constructor(clientMessage = NotFoundError.defaultMessage) { + super(404, clientMessage) + } +} + +export class InternalError extends HttpError { + static status = 500 + static defaultMessage = 'Internal error' + constructor(err: unknown, clientMessage = InternalError.defaultMessage) { + super(InternalError.status, clientMessage, err) + } +} + +export class BadGatewayError extends HttpError { + static status = 502 + static defaultMessage = 'Bad gateway' + constructor(clientMessage = InternalError.defaultMessage) { + super(BadGatewayError.status, clientMessage) + } +} + +export class BadRequestError extends HttpError { + static status = 400 + static defaultMessage = 'Bad request' + constructor(reason?: string) { + super(BadGatewayError.status, reason ? `${BadRequestError.defaultMessage}: ${reason}` : BadRequestError.defaultMessage) + } +} + +export const errorHandler = ( + log: Logger, + err: unknown, + req: http.IncomingMessage, + res: http.ServerResponse, +) => { + const [clientMessage, status, responseHeaders] = err instanceof HttpError + ? [err.clientMessage, err.status, err.responseHeaders] + : [InternalError.defaultMessage, InternalError.status, undefined] + + respondAccordingToAccept(req, res, clientMessage, status) + Object.entries(responseHeaders || {}).forEach(([k, v]) => res.setHeader(k, v)) + log.warn('caught error: %j in %s %s', inspect(err), req.method || '', req.url || '') +} + +export const tryHandler = ( + { log }: { log: Logger }, + f: (req: http.IncomingMessage, res: http.ServerResponse) => Promise +) => async (req: http.IncomingMessage, res: http.ServerResponse) => { + try { + await f(req, res) + } catch (err) { + errorHandler(log, err, req, res) + } +} + +export const errorUpgradeHandler = ( + log: Logger, + err: unknown, + req: http.IncomingMessage, + socket: stream.Duplex, +) => { + const message: string = err instanceof HttpError + ? err.clientMessage + : InternalError.defaultMessage + + socket.end(message) + log.warn('caught error: %j in upgrade %s %s', inspect(err), req.method || '', req.url || '') +} + +export const tryUpgradeHandler = ( + { log }: { log: Logger }, + f: (req: http.IncomingMessage, socket: stream.Duplex, head: Buffer) => Promise +) => async (req: http.IncomingMessage, socket: stream.Duplex, head: Buffer) => { + try { + await f(req, socket, head) + } catch (err) { + errorUpgradeHandler(log, err, req, socket) + } +} diff --git a/packages/compose-tunnel-agent/src/http/index.ts b/packages/compose-tunnel-agent/src/http/index.ts new file mode 100644 index 00000000..a7be7d99 --- /dev/null +++ b/packages/compose-tunnel-agent/src/http/index.ts @@ -0,0 +1,35 @@ +import http from 'node:http' +import pino from 'pino' +import { NotFoundError, tryHandler, tryUpgradeHandler } from './http-server-helpers' +import { DockerProxyHandlers } from './docker-proxy' + +export const httpServerHandlers = ( + { log, dockerProxyHandlers, apiHandler, dockerProxyPrefix }: { + log: pino.Logger + dockerProxyHandlers: DockerProxyHandlers + dockerProxyPrefix: `/${string}/` + apiHandler: http.RequestListener + } +) => { + const removeDockerPrefix = (s: string) => `/${s.substring(dockerProxyPrefix.length)}` + + const handler = tryHandler({ log }, async (req, res) => { + log.debug('request %s %s', req.method, req.url) + if (req.url?.startsWith(dockerProxyPrefix)) { + req.url = removeDockerPrefix(req.url) + return await dockerProxyHandlers.handler(req, res) + } + return apiHandler(req, res) + }) + + const upgradeHandler = tryUpgradeHandler({ log }, async (req, socket, head) => { + log.debug('upgrade %s %s', req.method, req.url) + if (req.url?.startsWith(dockerProxyPrefix)) { + req.url = removeDockerPrefix(req.url) + return await dockerProxyHandlers.upgradeHandler(req, socket, head) + } + throw new NotFoundError() + }) + + return { handler, upgradeHandler } +} diff --git a/packages/compose-tunnel-agent/src/http/query-params.ts b/packages/compose-tunnel-agent/src/http/query-params.ts new file mode 100644 index 00000000..8edb39df --- /dev/null +++ b/packages/compose-tunnel-agent/src/http/query-params.ts @@ -0,0 +1,20 @@ +import { defaults } from 'lodash' +import url from 'node:url' + +export const parseQueryParams = < +T extends Record +>(requestUrl: string, defaultValues: Partial = {}) => { + const { search } = url.parse(requestUrl) + const queryParams = new URLSearchParams(search || '') + return defaults(Object.fromEntries(queryParams), defaultValues) +} + +export const queryParamBoolean = (v: string | boolean | undefined, defaultValue = false): boolean => { + if (typeof v === 'boolean') { + return v + } + if (typeof v === 'undefined' || v === '') { + return defaultValue + } + return v === '1' || v.toLowerCase() === 'true' +} diff --git a/packages/compose-tunnel-agent/src/web.ts b/packages/compose-tunnel-agent/src/web.ts deleted file mode 100644 index 725ba2e1..00000000 --- a/packages/compose-tunnel-agent/src/web.ts +++ /dev/null @@ -1,45 +0,0 @@ -import http from 'node:http' -import { Logger } from '@preevy/common' -import { SshState } from './ssh/index' - -const respond = (res: http.ServerResponse, content: string, type = 'text/plain', status = 200) => { - res.writeHead(status, { 'Content-Type': type }) - res.end(content) -} - -const respondJson = ( - res: http.ServerResponse, - content: unknown, - status = 200, -) => respond(res, JSON.stringify(content), 'application/json', status) - -const respondNotFound = (res: http.ServerResponse) => respond(res, 'Not found', 'text/plain', 404) - -const createWebServer = ({ - log, currentSshState, -}: { - log: Logger - currentSshState: ()=> Promise -}) => http.createServer(async (req, res) => { - log.debug('web request URL: %j', req.url) - - if (!req.url) { - respondNotFound(res) - return - } - const [path] = req.url.split('?') - - if (path === '/tunnels') { - respondJson(res, await currentSshState()) - return - } - - if (path === '/healthz') { - respond(res, 'OK') - return - } - - respondNotFound(res) -}) - -export default createWebServer diff --git a/packages/core/src/commands/urls.ts b/packages/core/src/commands/urls.ts index a7dd78b0..0a596733 100644 --- a/packages/core/src/commands/urls.ts +++ b/packages/core/src/commands/urls.ts @@ -11,6 +11,7 @@ export const urls = async ({ tunnelingKey, includeAccessCredentials, retryOpts, + showPreevyService, }: { envId: string rootUrl: string @@ -19,6 +20,7 @@ export const urls = async ({ tunnelingKey: string | Buffer includeAccessCredentials: boolean retryOpts: retry.Options + showPreevyService: boolean }) => { const tunnelUrlsForService = tunnelUrlsForEnv({ envId, rootUrl: new URL(rootUrl), clientId }) @@ -29,6 +31,7 @@ export const urls = async ({ retryOpts, credentials, includeAccessCredentials, + showPreevyService, }) return flattenTunnels(tunnels) diff --git a/packages/core/src/compose-tunnel-agent-client.ts b/packages/core/src/compose-tunnel-agent-client.ts index 43a3a340..527f1478 100644 --- a/packages/core/src/compose-tunnel-agent-client.ts +++ b/packages/core/src/compose-tunnel-agent-client.ts @@ -1,6 +1,7 @@ import path from 'path' import fetch from 'node-fetch' import retry from 'p-retry' +import util from 'util' import { mapValues } from 'lodash' import { ComposeModel, ComposeService } from './compose/model' import { TunnelOpts } from './ssh/url' @@ -8,7 +9,7 @@ import { Tunnel } from './tunneling' import { withBasicAuthCredentials } from './url' export const COMPOSE_TUNNEL_AGENT_SERVICE_NAME = 'preevy_proxy' -export const COMPOSE_TUNNEL_AGENT_SERVICE_PORT = 3000 +export const COMPOSE_TUNNEL_AGENT_PORT = 3000 const COMPOSE_TUNNEL_AGENT_DIR = path.join(path.dirname(require.resolve('@preevy/compose-tunnel-agent')), '..') const baseDockerProxyService: ComposeService = { @@ -59,7 +60,7 @@ export const addComposeTunnelAgentService = ( ports: [ { mode: 'ingress', - target: 3000, + target: COMPOSE_TUNNEL_AGENT_PORT, published: '0', protocol: 'tcp', }, @@ -91,7 +92,7 @@ export const addComposeTunnelAgentService = ( TLS_SERVERNAME: tunnelOpts.tlsServerName, TUNNEL_URL_SUFFIX: urlSuffix, PREEVY_ENV_ID: envId, - PORT: COMPOSE_TUNNEL_AGENT_SERVICE_PORT.toString(), + PORT: COMPOSE_TUNNEL_AGENT_PORT.toString(), ...debug ? { DEBUG: '1' } : {}, HOME: '/preevy', }, @@ -104,16 +105,24 @@ export const queryTunnels = async ({ tunnelUrlsForService, credentials, includeAccessCredentials, + showPreevyService, }: { tunnelUrlsForService: (service: { name: string; ports: number[] }) => { port: number; url: string }[] credentials: { user: string; password: string } retryOpts?: retry.Options includeAccessCredentials: boolean + showPreevyService: boolean }) => { - const serviceUrl = tunnelUrlsForService({ + const serviceUrls = tunnelUrlsForService({ name: COMPOSE_TUNNEL_AGENT_SERVICE_NAME, - ports: [COMPOSE_TUNNEL_AGENT_SERVICE_PORT], - })[0].url.replace(/\/$/, '') + ports: [COMPOSE_TUNNEL_AGENT_PORT], + }) + + const serviceUrl = serviceUrls.find(({ port }) => port === COMPOSE_TUNNEL_AGENT_PORT)?.url.replace(/\/$/, '') + + if (!serviceUrl) { + throw new Error(`Cannot find compose tunnel agent API service URL in: ${util.inspect(serviceUrls)}`) + } const addCredentials = withBasicAuthCredentials(credentials) const url = addCredentials(`${serviceUrl}/tunnels`) @@ -128,7 +137,7 @@ export const queryTunnels = async ({ return { tunnels: tunnels - .filter(({ service }: Tunnel) => service !== COMPOSE_TUNNEL_AGENT_SERVICE_NAME) + .filter(({ service }: Tunnel) => showPreevyService || service !== COMPOSE_TUNNEL_AGENT_SERVICE_NAME) .map(tunnel => ({ ...tunnel, ports: mapValues(tunnel.ports, includeAccessCredentials ? addCredentials : (x: string) => x), diff --git a/tunnel-server/index.ts b/tunnel-server/index.ts index 06328fbf..cf390bcc 100644 --- a/tunnel-server/index.ts +++ b/tunnel-server/index.ts @@ -13,7 +13,7 @@ import { appLoggerFromEnv } from './src/logging' import { tunnelsGauge, runMetricsServer } from './src/metrics' import { numberFromEnv, requiredEnv } from './src/env' import { replaceHostname } from './src/url' -import { sessionStore } from './src/session' +import { cookieSessionStore } from './src/session' import { claimsSchema } from './src/auth' const __dirname = url.fileURLToPath(new URL('.', import.meta.url)) @@ -37,15 +37,15 @@ const BASE_URL = (() => { })() const envStore = inMemoryPreviewEnvStore() -const appSessionManager = sessionStore({ domain: BASE_URL.hostname, schema: claimsSchema, keys: process.env.COOKIE_SECRETS?.split(' ') }) +const appSessionStore = cookieSessionStore({ domain: BASE_URL.hostname, schema: claimsSchema, keys: process.env.COOKIE_SECRETS?.split(' ') }) const loginUrl = new URL('/login', replaceHostname(BASE_URL, `auth.${BASE_URL.hostname}`)).toString() const app = createApp({ - session: appSessionManager, + sessionStore: appSessionStore, envStore, baseUrl: BASE_URL, isProxyRequest: isProxyRequest(BASE_URL.hostname), - proxyHandlers: proxyHandlers({ envStore, logger, loginUrl, sessionManager: appSessionManager }), - logger, + proxyHandlers: proxyHandlers({ envStore, log: logger, loginUrl, sessionStore: appSessionStore }), + log: logger, }) const sshLogger = logger.child({ name: 'ssh_server' }) diff --git a/tunnel-server/src/app.ts b/tunnel-server/src/app.ts index 9f2cff6b..fb2b557f 100644 --- a/tunnel-server/src/app.ts +++ b/tunnel-server/src/app.ts @@ -3,41 +3,47 @@ import { fastifyRequestContext } from '@fastify/request-context' import http from 'http' import internal from 'stream' import { Logger } from 'pino' -import { match } from 'ts-pattern' import { SessionStore } from './session' import { Claims, JwtAuthenticator, authenticator, getIssuerToKeyDataFromEnv, unauthorized } from './auth' import { PreviewEnvStore } from './preview-env' import { replaceHostname } from './url' -export const app = ({ isProxyRequest, proxyHandlers, session: sessionManager, baseUrl, envStore, logger }: { +export const app = ({ isProxyRequest, proxyHandlers, sessionStore, baseUrl, envStore, log }: { isProxyRequest: (req: http.IncomingMessage) => boolean - logger: Logger + log: Logger baseUrl: URL - session: SessionStore + sessionStore: SessionStore envStore: PreviewEnvStore proxyHandlers: { - wsHandler: (req: http.IncomingMessage, socket: internal.Duplex, head: Buffer) => void + upgradeHandler: (req: http.IncomingMessage, socket: internal.Duplex, head: Buffer) => void handler: (req: http.IncomingMessage, res: http.ServerResponse) => void } }) => Fastify({ serverFactory: handler => { - const { wsHandler: proxyWsHandler, handler: proxyHandler } = proxyHandlers - const server = http.createServer((req, res) => match(req) - .when(r => r.headers.host?.startsWith('auth.'), () => handler(req, res)) - .when(isProxyRequest, () => proxyHandler(req, res)) - .otherwise(() => handler(req, res))) - server.on('upgrade', (req, socket, head) => { - if (isProxyRequest(req)) { - proxyWsHandler(req, socket, head) - } else { - logger.warn('unexpected upgrade request %j', { url: req.url, host: req.headers.host }) - socket.end() + const { upgradeHandler: proxyUpgradeHandler, handler: proxyHandler } = proxyHandlers + const server = http.createServer((req, res) => { + if (req.url !== '/healthz') { + log.debug('request %j', { method: req.method, url: req.url, headers: req.headers }) } + if (!req.headers.host?.startsWith('auth.') && isProxyRequest(req)) { + return proxyHandler(req, res) + } + return handler(req, res) }) + .on('upgrade', (req, socket, head) => { + log.debug('upgrade', req.url) + if (isProxyRequest(req)) { + return proxyUpgradeHandler(req, socket, head) + } + + log.warn('upgrade request %j not found', { url: req.url, host: req.headers.host }) + socket.end('Not found') + return undefined + }) return server }, - logger, + logger: log, }) .register(fastifyRequestContext) .get<{Querystring: {env: string; returnPath?: string}}>('/login', { @@ -61,9 +67,9 @@ export const app = ({ isProxyRequest, proxyHandlers, session: sessionManager, ba res.statusCode = 404 return { error: 'unknown envId' } } - const session = sessionManager(req.raw, res.raw, env.publicKeyThumbprint) + const session = sessionStore(req.raw, res.raw, env.publicKeyThumbprint) if (!session.user) { - const auth = authenticator([JwtAuthenticator(getIssuerToKeyDataFromEnv(env, logger))]) + const auth = authenticator([JwtAuthenticator(getIssuerToKeyDataFromEnv(env, log))]) const result = await auth(req.raw) if (!result.isAuthenticated) { return unauthorized(res.raw) diff --git a/tunnel-server/src/auth.ts b/tunnel-server/src/auth.ts index 8ce8ce40..31d51c32 100644 --- a/tunnel-server/src/auth.ts +++ b/tunnel-server/src/auth.ts @@ -6,6 +6,7 @@ import Cookies from 'cookies' import { KeyObject } from 'crypto' import type { Logger } from 'pino' import { PreviewEnv } from './preview-env' +import { HttpError } from './http-server-helpers' export class AuthError extends Error {} @@ -115,12 +116,23 @@ export function authenticator(authenticators: ((req: IncomingMessage)=> Promise< } } +// TODO: combine with UnauthorizedError export const unauthorized = (res: ServerResponse) => { res.setHeader('WWW-Authenticate', 'Basic realm="Secure Area"') res.statusCode = 401 res.end('Unauthorized') } +export class UnauthorizedError extends HttpError { + static status = 401 + static defaultMessage = 'Unauthorized' + constructor(readonly reason: string) { + super(UnauthorizedError.status, UnauthorizedError.defaultMessage, undefined, { + 'WWW-Authenticate': 'Basic realm="Secure Area"', + }) + } +} + export const getIssuerToKeyDataFromEnv = (env: PreviewEnv, log: Logger): IssuerToKeyData => iss => { const expectedIssuer = `preevy://${env.publicKeyThumbprint}` if (iss !== expectedIssuer) { diff --git a/tunnel-server/src/errors.ts b/tunnel-server/src/errors.ts deleted file mode 100644 index 68b4851c..00000000 --- a/tunnel-server/src/errors.ts +++ /dev/null @@ -1,32 +0,0 @@ -export type ErrorWithCode = Error & { statusCode: number } - -export class BaseErrorWithCode extends Error implements ErrorWithCode { - // noinspection JSUnusedGlobalSymbols - constructor(clientMessage: string, readonly statusCode: number) { - super(clientMessage) - } -} - -export class InternalServerError extends BaseErrorWithCode { - constructor(clientMessage: string, statusCode: 500 | 501 | 502 | 503 = 500) { - super(clientMessage, statusCode) - } -} - -export class NotFoundError extends BaseErrorWithCode { - constructor(resource: string) { - super(`${resource} not found`, 404) - } -} - -export class ForbiddenError extends BaseErrorWithCode { - constructor(reason?: string) { - super(['forbidden', reason].filter(Boolean).join(': '), 403) - } -} - -export class UnauthorizedError extends BaseErrorWithCode { - constructor(reason: string) { - super(`unauthorized: ${reason}`, 401) - } -} diff --git a/tunnel-server/src/http-server-helpers.ts b/tunnel-server/src/http-server-helpers.ts new file mode 100644 index 00000000..ee12456f --- /dev/null +++ b/tunnel-server/src/http-server-helpers.ts @@ -0,0 +1,117 @@ +import { Logger } from 'pino' +import http from 'node:http' +import stream from 'node:stream' +import { inspect } from 'node:util' + +export const respond = (res: http.ServerResponse, content: string, type = 'text/plain', status = 200) => { + res.writeHead(status, { 'Content-Type': type }) + res.end(content) +} + +export const respondJson = ( + res: http.ServerResponse, + content: unknown, + status = 200, +) => respond(res, JSON.stringify(content), 'application/json', status) + +export const respondAccordingToAccept = ( + req: http.IncomingMessage, + res: http.ServerResponse, + message: string, + status = 200, +) => (req.headers.accept?.toLowerCase().includes('json') + ? respondJson(res, { message }, status) + : respond(res, message, 'text/plain', status)) + +export class HttpError extends Error { + constructor( + readonly status: number, + readonly clientMessage: string, + readonly cause?: unknown, + readonly responseHeaders?: Record + ) { + super(clientMessage) + } +} + +export class NotFoundError extends HttpError { + static defaultMessage = 'Not found' + constructor(clientMessage = NotFoundError.defaultMessage) { + super(404, clientMessage) + } +} + +export class InternalError extends HttpError { + static status = 500 + static defaultMessage = 'Internal error' + constructor(err: unknown, clientMessage = InternalError.defaultMessage) { + super(InternalError.status, clientMessage, err) + } +} + +export class BadGatewayError extends HttpError { + static status = 502 + static defaultMessage = 'Bad gateway' + constructor(clientMessage = InternalError.defaultMessage) { + super(BadGatewayError.status, clientMessage) + } +} + +export class BadRequestError extends HttpError { + static status = 400 + static defaultMessage = 'Bad request' + constructor(reason?: string) { + super(BadGatewayError.status, reason ? `${BadRequestError.defaultMessage}: ${reason}` : BadRequestError.defaultMessage) + } +} + +export const errorHandler = ( + log: Logger, + err: unknown, + req: http.IncomingMessage, + res: http.ServerResponse, +) => { + const [clientMessage, status, responseHeaders] = err instanceof HttpError + ? [err.clientMessage, err.status, err.responseHeaders] + : [InternalError.defaultMessage, InternalError.status, undefined] + + Object.entries(responseHeaders || {}).forEach(([k, v]) => res.setHeader(k, v)) + respondAccordingToAccept(req, res, clientMessage, status) + log.warn('caught error: %j in %s %s', inspect(err), req.method || '', req.url || '') +} + +export const tryHandler = ( + { log }: { log: Logger }, + f: (req: http.IncomingMessage, res: http.ServerResponse) => Promise +) => async (req: http.IncomingMessage, res: http.ServerResponse) => { + try { + await f(req, res) + } catch (err) { + errorHandler(log, err, req, res) + } +} + +export const errorUpgradeHandler = ( + log: Logger, + err: unknown, + req: http.IncomingMessage, + socket: stream.Duplex, +) => { + const message: string = err instanceof HttpError + ? err.clientMessage + : InternalError.defaultMessage + + socket.end(message) + log.warn('caught error: %j in upgrade %s %s', inspect(err), req.method || '', req.url || '') +} + +export const tryUpgradeHandler = ( + { log }: { log: Logger }, + f: (req: http.IncomingMessage, socket: stream.Duplex, head: Buffer) => Promise +) => async (req: http.IncomingMessage, socket: stream.Duplex, head: Buffer) => { + try { + await f(req, socket, head) + } catch (err) { + errorUpgradeHandler(log, err, req, socket) + } +} diff --git a/tunnel-server/src/proxy.ts b/tunnel-server/src/proxy.ts index a3f8b09a..cf206dd1 100644 --- a/tunnel-server/src/proxy.ts +++ b/tunnel-server/src/proxy.ts @@ -1,30 +1,19 @@ import httpProxy from 'http-proxy' import { IncomingMessage, ServerResponse } from 'http' +import net from 'net' import internal from 'stream' import type { Logger } from 'pino' import { inspect } from 'util' import { PreviewEnvStore } from './preview-env' import { requestsCounter } from './metrics' -import { Claims, authenticator, JwtAuthenticator, unauthorized, getIssuerToKeyDataFromEnv, AuthenticationResult, AuthError } from './auth' +import { Claims, authenticator, JwtAuthenticator, getIssuerToKeyDataFromEnv, AuthenticationResult, AuthError, UnauthorizedError } from './auth' import { SessionStore } from './session' +import { BadGatewayError, BadRequestError, errorHandler, errorUpgradeHandler, tryHandler, tryUpgradeHandler } from './http-server-helpers' export const isProxyRequest = ( hostname: string, ) => (req: IncomingMessage) => Boolean(req.headers.host?.split(':')?.[0]?.endsWith(`.${hostname}`)) -function asyncHandler( - fn: (...args: TArgs) => Promise, - onError: (error: unknown, ...args: TArgs)=> void, -) { - return async (...args: TArgs) => { - try { - await fn(...args) - } catch (err) { - onError(err, ...args) - } - } -} - function loginRedirector(loginUrl:string) { return (res: ServerResponse, env: string, returnPath?: string) => { res.statusCode = 307 @@ -42,13 +31,13 @@ function loginRedirector(loginUrl:string) { export function proxyHandlers({ envStore, loginUrl, - sessionManager, - logger, + sessionStore, + log, }: { - sessionManager: SessionStore + sessionStore: SessionStore envStore: PreviewEnvStore loginUrl: string - logger: Logger + log: Logger }) { const proxy = httpProxy.createProxy({}) const redirectToLogin = loginRedirector(loginUrl) @@ -58,38 +47,37 @@ export function proxyHandlers({ const targetHost = host?.split('.', 1)[0] const env = await envStore.get(targetHost as string) if (!env) { - logger.warn('no env for %j', { targetHost, url }) + log.warn('no env for %j', { targetHost, url }) return undefined } return env } return { - handler: asyncHandler(async (req: IncomingMessage, res: ServerResponse) => { + handler: tryHandler({ log }, async (req: IncomingMessage, res: ServerResponse) => { const env = await resolveTargetEnv(req) if (!env) { - res.statusCode = 502 - res.end() - return undefined + throw new BadGatewayError() } - const session = sessionManager(req, res, env.publicKeyThumbprint) + const session = sessionStore(req, res, env.publicKeyThumbprint) if (env.access === 'private') { if (!session.user) { - const authenticate = authenticator([JwtAuthenticator(getIssuerToKeyDataFromEnv(env, logger))]) + const authenticate = authenticator([ + JwtAuthenticator(getIssuerToKeyDataFromEnv(env, log)), + ]) let authResult: AuthenticationResult try { authResult = await authenticate(req) } catch (e) { if (e instanceof AuthError) { - res.statusCode = 400 - logger.warn('Auth error %j', inspect(e)) - res.end(`Auth error: ${e.message}`) - return undefined + log.warn('Auth error %j', inspect(e)) + throw new BadRequestError(`Auth error: ${e.message}`) } throw e } if (!authResult.isAuthenticated) { - return unauthorized(res) + log.debug('unauthorized request: %j %j %j', req.url, req.method, req.headers) + throw new UnauthorizedError('invalid auth') } session.set(authResult.claims) if (authResult.login && req.method === 'GET') { @@ -103,11 +91,11 @@ export function proxyHandlers({ } if (session.user?.role !== 'admin') { - return unauthorized(res) + throw new UnauthorizedError('not admin') } } - logger.debug('proxying to %j', { target: env.target, url: req.url }) + log.debug('proxying to %j', { target: env.target, url: req.url }) requestsCounter.inc({ clientId: env.clientId }) return proxy.web( @@ -120,42 +108,56 @@ export function proxyHandlers({ socketPath: env.target, }, }, - err => { - logger.warn('error in proxy %j', { error: err, targetHost: env.target, url: req.url }) - res.statusCode = 502 - res.end(`error proxying request: ${(err as unknown as { code: unknown }).code}`) - } + err => errorHandler(log, err, req, res) ) - }, err => { logger.error('error forwarding traffic %j', inspect(err)) }), - wsHandler: asyncHandler(async (req: IncomingMessage, socket: internal.Duplex, head: Buffer) => { + }), + + upgradeHandler: tryUpgradeHandler({ log }, async (req: IncomingMessage, socket: internal.Duplex, head: Buffer) => { const env = await resolveTargetEnv(req) if (!env) { - socket.end() - return undefined + log.warn('env not found for upgrade %j', req.url) + throw new BadGatewayError() } + log.debug('upgrade handler %j', { url: req.url, env, headers: req.headers }) + if (env.access === 'private') { - const session = sessionManager(req, undefined as any, env.clientId) + const session = sessionStore(req, undefined as any, env.publicKeyThumbprint) if (session.user?.role !== 'admin') { - socket.end() - return undefined + log.debug('unauthorized upgrade - not admin %j %j %j', req.url, req.method, req.headers) + throw new UnauthorizedError('not admin') } } - return proxy.ws( - req, - socket, - head, - { - // eslint-disable-next-line @typescript-eslint/ban-ts-comment - // @ts-ignore - target: { - socketPath: env.target, + + const upgrade = req.headers.upgrade?.toLowerCase() + + if (upgrade === 'websocket') { + return proxy.ws( + req, + socket, + head, + { + // eslint-disable-next-line @typescript-eslint/ban-ts-comment + // @ts-ignore + target: { + socketPath: env.target, + }, }, - }, - err => { - logger.warn('error in ws proxy %j', { error: err, targetHost: env.target, url: req.url }) - } - ) - }, err => logger.error('error forwarding ws traffic %j', { error: err })), + err => errorUpgradeHandler(log, err, req, socket) + ) + } + + if (upgrade === 'tcp') { + const targetSocket = net.createConnection({ path: env.target }, () => { + const reqBuf = `${req.method} ${req.url} HTTP/${req.httpVersion}\r\n${Object.entries(req.headers).map(([k, v]) => `${k}: ${v}`).join('\r\n')}\r\n\r\n` + targetSocket.write(reqBuf) + targetSocket.write(head) + socket.pipe(targetSocket).pipe(socket) + }) + return undefined + } + + throw new BadRequestError('Unsupported upgrade header') + }), } } diff --git a/tunnel-server/src/session.ts b/tunnel-server/src/session.ts index 00804fd6..eabdbae9 100644 --- a/tunnel-server/src/session.ts +++ b/tunnel-server/src/session.ts @@ -4,15 +4,19 @@ import { randomBytes } from 'crypto' import * as z from 'zod' // for testing, for production workload use the env var COOKIE_SECRETS -function generateSecret() { +function generateInsecureSecret() { return randomBytes(32) .toString('base64') .slice(0, 32) } -export function sessionStore(opts: {domain: string; schema: z.ZodSchema; keys?: string[] }) { - const keys = opts.keys ?? [generateSecret()] - return function getSession(req: IncomingMessage, res: ServerResponse, thumbprint: string) { +export function cookieSessionStore(opts: {domain: string; schema: z.ZodSchema; keys?: string[] }) { + const keys = opts.keys ?? [generateInsecureSecret()] + return function getSession( + req: IncomingMessage, + res: ServerResponse, + thumbprint: string + ) { const cookies = new Cookies(req, res, { secure: true, keys, @@ -32,4 +36,4 @@ export function sessionStore(opts: {domain: string; schema: z.ZodSchema; k } } -export type SessionStore = ReturnType> +export type SessionStore = ReturnType> diff --git a/tunnel-server/src/ssh-server.ts b/tunnel-server/src/ssh-server.ts index a8924ae6..2350e5da 100644 --- a/tunnel-server/src/ssh-server.ts +++ b/tunnel-server/src/ssh-server.ts @@ -92,7 +92,7 @@ export const sshServer = ( const serverEmitter = new EventEmitter() as Omit const server = new ssh2.Server( { - debug: x => log.debug(x), + // debug: x => log.debug(x), // keepaliveInterval: 1000, // keepaliveCountMax: 5, hostKeys: [sshPrivateKey], @@ -182,7 +182,7 @@ export const sshServer = ( parsed, () => new Promise((resolveForward, rejectForward) => { const socketServer = net.createServer(socket => { - log.debug('socketServer connected %j', socket) + log.debug('socketServer connected') client.openssh_forwardOutStreamLocal( request, (err, upstream) => { diff --git a/yarn.lock b/yarn.lock index db5c7af3..e5cf3255 100644 --- a/yarn.lock +++ b/yarn.lock @@ -4613,6 +4613,13 @@ resolved "https://registry.yarnpkg.com/@types/http-cache-semantics/-/http-cache-semantics-4.0.1.tgz#0ea7b61496902b95890dc4c3a116b60cb8dae812" integrity sha512-SZs7ekbP8CN0txVG2xVRH6EgKmEm31BOxA07vkFaETzZz1xh+cbt8BcI0slpymvwhx5dlFnQG2rTlPVQn+iRPQ== +"@types/http-proxy@^1.17.9": + version "1.17.11" + resolved "https://registry.yarnpkg.com/@types/http-proxy/-/http-proxy-1.17.11.tgz#0ca21949a5588d55ac2b659b69035c84bd5da293" + integrity sha512-HC8G7c1WmaF2ekqpnFq626xd3Zz0uvaqFmBJNRZCGEZCXkvSdJoNFn/8Ygbd9fKNQj8UzLdCETaI0UWPAjK7IA== + dependencies: + "@types/node" "*" + "@types/inquirer@^8.0.0": version "8.2.6" resolved "https://registry.yarnpkg.com/@types/inquirer/-/inquirer-8.2.6.tgz#abd41a5fb689c7f1acb12933d787d4262a02a0ab" @@ -13942,6 +13949,11 @@ vinyl@^2.0.1: remove-trailing-separator "^1.0.1" replace-ext "^1.0.0" +wait-for-expect@^3.0.2: + version "3.0.2" + resolved "https://registry.yarnpkg.com/wait-for-expect/-/wait-for-expect-3.0.2.tgz#d2f14b2f7b778c9b82144109c8fa89ceaadaa463" + integrity sha512-cfS1+DZxuav1aBYbaO/kE06EOS8yRw7qOFoD3XtjTkYvCvh3zUvNST8DXK/nPaeqIzIv3P3kL3lRJn8iwOiSag== + walk-up-path@^1.0.0: version "1.0.0" resolved "https://registry.yarnpkg.com/walk-up-path/-/walk-up-path-1.0.0.tgz#d4745e893dd5fd0dbb58dd0a4c6a33d9c9fec53e" @@ -14145,7 +14157,7 @@ write-pkg@4.0.0: type-fest "^0.4.1" write-json-file "^3.2.0" -ws@^8.11.0: +ws@^8.11.0, ws@^8.13.0: version "8.13.0" resolved "https://registry.yarnpkg.com/ws/-/ws-8.13.0.tgz#9a9fb92f93cf41512a0735c8f4dd09b8a1211cd0" integrity sha512-x9vcZYTrFPC7aSIbj7sRCYo7L/Xb8Iy+pW0ng0wt2vCJv7M9HOMy0UoN3rr+IFC7hb7vXoqS+P9ktyLLLhO+LA==