Skip to content

Commit

Permalink
tunnel server: fix duplicate request
Browse files Browse the repository at this point in the history
  • Loading branch information
Roy Razon committed Jul 26, 2023
1 parent ab45f3f commit ef3bb10
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 54 deletions.
69 changes: 49 additions & 20 deletions tunnel-server/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@ const app = createApp({
proxyHandlers: proxyHandlers({ envStore, logger }),
logger,
})
const sshLogger = logger.child({ name: 'ssh_server' })
const sshLog = logger.child({ name: 'ssh_server' })

const tunnelName = (clientId: string, remotePath: string) => {
const tunnelName = (hostnameSuffix: string, remotePath: string) => {
const serviceName = remotePath.replace(/^\//, '')
return `${serviceName}-${clientId}`.toLowerCase()
return `${serviceName}-${hostnameSuffix}`.toLowerCase()
}

const tunnelUrl = (
Expand All @@ -53,43 +53,72 @@ const tunnelUrl = (
tunnel: string,
) => replaceHostname(rootUrl, `${tunnelName(clientId, tunnel)}.${rootUrl.hostname}`).toString()

const tunnelsPerClientUniqueId = new Map<string, Map<string, { closeForward:() => void }>>()

const sshServer = createSshServer({
log: sshLogger,
log: sshLog,
sshPrivateKey,
socketDir: '/tmp', // TODO
})
.on('client', client => {
const { clientId, publicKey } = client
const tunnels = new Map<string, string>()
const { hostnameSuffix, publicKey, uniqueId } = client
const clientLog = sshLog.child({ uniqueClientId: uniqueId })
const tunnels = new Map<string, { tunnelUrl: string; closeForward:() => void }>()
tunnelsPerClientUniqueId.set(uniqueId, tunnels)
client
.on('forward', async (requestId, { path: remotePath, access }, accept, reject) => {
const key = tunnelName(clientId, remotePath)
if (await envStore.has(key)) {
reject(new Error(`duplicate path: ${key}`))
return
const forwardLog = clientLog.child({ forwardId: requestId })
const key = tunnelName(hostnameSuffix, remotePath)
const existingEntry = await envStore.get(key)
if (existingEntry) {
if (existingEntry.clientUniqueId === uniqueId) {
reject(new Error(`duplicate request ${requestId} for client ${uniqueId} suffix ${hostnameSuffix}`))
return
}
forwardLog.warn('forward: overriding duplicate envStore entry for path %s: %j', key, existingEntry)
await envStore.delete(key, existingEntry.clientUniqueId)

// close tunnel of overridden client
tunnelsPerClientUniqueId.get(existingEntry.clientUniqueId)?.get(requestId)?.closeForward()
}
const forward = await accept()
sshLogger.debug('creating tunnel %s for localSocket %s', key, forward.localSocketPath)
forwardLog.debug('creating tunnel %s for localSocket %s', key, forward.localSocketPath)
await envStore.set(key, {
clientUniqueId: uniqueId,
hostnameSuffix,
target: forward.localSocketPath,
clientId,
publicKey: createPublicKey(publicKey.getPublicPEM()),
access,
})
tunnels.set(requestId, tunnelUrl(BASE_URL, clientId, remotePath))
tunnelsGauge.inc({ clientId })
tunnels.set(requestId, {
tunnelUrl: tunnelUrl(BASE_URL, hostnameSuffix, remotePath),
closeForward: () => {
forwardLog.debug('calling forward.close')
forward.close()
},
})
tunnelsGauge.inc({ clientId: hostnameSuffix })

forward.on('close', () => {
sshLogger.debug('deleting tunnel %s', key)
forward.on('close', async () => {
forwardLog.debug('forward close event')
tunnels.delete(requestId)
void envStore.delete(key)
tunnelsGauge.dec({ clientId })
const storedEnv = await envStore.delete(key, uniqueId)
if (!storedEnv) {
forwardLog.info('forward.close: no stored env')
return
}
tunnelsGauge.dec({ clientId: hostnameSuffix })
})
})
.on('error', err => { sshLogger.warn('client error %j: %j', clientId, inspect(err)) })
.on('close', () => {
clientLog.debug('client %s closed', uniqueId)
tunnels.forEach(t => t.closeForward())
tunnelsPerClientUniqueId.delete(uniqueId)
})
.on('error', err => { clientLog.warn('client error %j', inspect(err)) })
.on('hello', channel => {
channel.stdout.write(`${JSON.stringify({
clientId,
clientId: hostnameSuffix,
// TODO: backwards compat, remove when we drop support for CLI v0.0.35
baseUrl: { hostname: BASE_URL.hostname, port: BASE_URL.port, protocol: BASE_URL.protocol },
rootUrl: BASE_URL.toString(),
Expand Down
28 changes: 20 additions & 8 deletions tunnel-server/src/preview-env.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import { KeyObject } from 'crypto'
import EventEmitter from 'events'

export type PreviewEnv = {
clientId: string
clientUniqueId: string
hostnameSuffix: string
target: string
publicKey: KeyObject
access: 'private' | 'public'
Expand All @@ -11,17 +13,27 @@ export type PreviewEnvStore = {
get: (key: string) => Promise<PreviewEnv | undefined>
set: (key: string, env: PreviewEnv) => Promise<void>
has: (key: string) => Promise<boolean>
delete: (key: string) => Promise<boolean>
delete: (key: string, clientUniqueId: string) => Promise<boolean>
}

export const inMemoryPreviewEnvStore = (initial?: Record<string, PreviewEnv>): PreviewEnvStore => {
const map = new Map<string, PreviewEnv>(Object.entries(initial ?? {}))
return {
get: async key => map.get(key),
set: async (key, value) => {
const emitter = new EventEmitter()
return Object.assign(emitter, {
get: async (key: string) => map.get(key),
set: async (key: string, value: PreviewEnv) => {
map.set(key, value)
},
has: async key => map.has(key),
delete: async key => map.delete(key),
}
has: async (key: string) => map.has(key),
delete: (key: string, clientUniqueId: string) => new Promise<boolean>(resolve => {
const existing = map.get(key)
if (!existing || existing.clientUniqueId !== clientUniqueId) {
resolve(false)
return
}
map.delete(key)
resolve(true)
emitter.emit('deleted', key)
}),
})
}
2 changes: 1 addition & 1 deletion tunnel-server/src/proxy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ export function proxyHandlers({
}

logger.debug('proxying to %j', { target: env.target, url: req.url })
requestsCounter.inc({ clientId: env.clientId })
requestsCounter.inc({ clientId: env.hostnameSuffix })

return proxy.web(
req,
Expand Down
67 changes: 42 additions & 25 deletions tunnel-server/src/ssh-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import EventEmitter from 'node:events'
import { Writable } from 'node:stream'
import { ForwardRequest, parseForwardRequest } from './forward-request'

const idFromPublicSsh = (key: Buffer) =>
const hostnameSuffixFromPublicSsh = (key: Buffer) =>
crypto.createHash('sha1').update(key).digest('base64url').replace(/[_-]/g, '')
.slice(0, 8)
.toLowerCase()
Expand All @@ -31,10 +31,12 @@ const parseForwardRequestFromSocketBindInfo = (
export interface ClientForward extends EventEmitter {
localSocketPath: string
on: (event: 'close', listener: () => void) => this
close: () => void
}

export interface SshClient extends EventEmitter {
clientId: string
hostnameSuffix: string
uniqueId: string
publicKey: ParsedKey
on: (
(
Expand All @@ -58,6 +60,11 @@ export interface SshClient extends EventEmitter {
event: 'error',
listener: (err: Error) => void,
) => this
) & (
(
event: 'close',
listener: () => void,
) => this
)
}

Expand Down Expand Up @@ -88,6 +95,8 @@ export const sshServer = (
socketDir: string
}
): SshServer => {
const serverId = randomBytes(8).toString('base64url').replace(/[^A-Za-z0-9]/g, '')
let currentClientId = 0
const serverEmitter = new EventEmitter() as Omit<SshServer, 'close' | 'listen'>
const server = new ssh2.Server(
{
Expand All @@ -97,44 +106,48 @@ export const sshServer = (
hostKeys: [sshPrivateKey],
},
client => {
currentClientId += 1
const uniqueId = `${serverId}-${currentClientId}`
const clientLog = log.child({ clientUniqueId: uniqueId })
let preevySshClient: SshClient
const socketServers = new Map<string, net.Server>()

client
.on('authentication', ctx => {
log.debug('authentication: %j', ctx)
clientLog.debug('authentication: %j', ctx)
if (ctx.method !== 'publickey') {
ctx.reject(['publickey'])
return
}

const keyOrError = ssh2.utils.parseKey(ctx.key.data)
if (!('getPublicSSH' in keyOrError)) {
log.error('error parsing key: %j', keyOrError)
clientLog.error('error parsing key: %j', keyOrError)
ctx.reject()
return
}

// calling "accept" when no signature specified does not result in authenticated state
// see: https://github.com/mscdex/ssh2/issues/561#issuecomment-303263753
if (ctx.signature && !keyOrError.verify(ctx.blob as Buffer, ctx.signature, ctx.key.algo)) {
log.error('error verifying key: %j', keyOrError)
clientLog.error('error verifying key: %j', keyOrError)
ctx.reject(['publickey'])
return
}

preevySshClient = Object.assign(new EventEmitter(), {
publicKey: keyOrError,
clientId: idFromPublicSsh(keyOrError.getPublicSSH()),
hostnameSuffix: hostnameSuffixFromPublicSsh(keyOrError.getPublicSSH()),
uniqueId,
})
log.debug('accepting clientId %j', preevySshClient.clientId)
clientLog.debug('accepting hostnameSuffix %j', preevySshClient.hostnameSuffix)
ctx.accept()
serverEmitter.emit('client', preevySshClient)
})
.on('request', async (accept, reject, name, info) => {
log.debug('request %j', { accept, reject, name, info })
clientLog.debug('request %j', { accept, reject, name, info })
if (!client.authenticated) {
log.error('not authenticated, rejecting')
clientLog.error('not authenticated, rejecting')
reject?.()
return
}
Expand All @@ -143,7 +156,7 @@ export const sshServer = (
const request = forwardRequestFromSocketBindInfo(info as unknown as SocketBindInfo)
const deleted = socketServers.get(request)
if (!deleted) {
log.error('[email protected]: request %j not found, rejecting', request)
clientLog.error('[email protected]: request %j not found, rejecting', request)
reject?.()
return
}
Expand All @@ -153,23 +166,23 @@ export const sshServer = (
}

if ((name as string) !== '[email protected]') {
log.error('invalid request %j', { name, info })
clientLog.error('invalid request %j', { name, info })
reject?.()
return
}

const res = parseForwardRequestFromSocketBindInfo(info as unknown as SocketBindInfo)
const { request } = res
if ('error' in res) {
log.error('[email protected]: rejecting %j, error parsing: %j', request, inspect(res.error))
clientLog.error('[email protected]: rejecting %j, error parsing: %j', request, inspect(res.error))
reject?.()
return
}

const { parsed } = res

if (socketServers.has(request)) {
log.error('[email protected]: rejecting %j, duplicate socket request', request)
clientLog.error('[email protected]: rejecting %j, duplicate socket request', request)
reject?.()
return
}
Expand All @@ -180,15 +193,15 @@ export const sshServer = (
parsed,
() => new Promise<ClientForward>((resolveForward, rejectForward) => {
const socketServer = net.createServer(socket => {
log.debug('socketServer connected %j', socket)
clientLog.debug('socketServer connected %j', socket)
client.openssh_forwardOutStreamLocal(
request,
(err, upstream) => {
if (err) {
log.error('error forwarding request %j: %s', request, inspect(err))
clientLog.error('error forwarding request %j: %s', request, inspect(err))
socket.end()
socketServer.close(closeErr => {
log.error('error closing socket server for request %j: %j', request, inspect(closeErr))
clientLog.error('error closing socket server for request %j: %j', request, inspect(closeErr))
})
return
}
Expand All @@ -197,48 +210,52 @@ export const sshServer = (
)
})

const socketPath = path.join(socketDir, `s_${preevySshClient.clientId}_${randomBytes(16).toString('hex')}`)
const socketPath = path.join(socketDir, `s_${preevySshClient.hostnameSuffix}_${randomBytes(16).toString('hex')}`)

const closeSocketServer = () => socketServer.close()

socketServer
.listen(socketPath, () => {
log.debug('[email protected]: request %j calling accept: %j', request, accept)
clientLog.debug('[email protected]: request %j calling accept: %j', request, accept)
accept?.()
socketServers.set(request, socketServer)
resolveForward(Object.assign(socketServer, { localSocketPath: socketPath }))
})
.on('error', (err: unknown) => {
log.error('socketServer request %j error: %j', request, err)
clientLog.error('socketServer request %j error: %j', request, err)
socketServer.close()
rejectForward(err)
})
.on('close', () => {
log.debug('socketServer close: %j', socketPath)
clientLog.debug('socketServer close: %j', socketPath)
socketServers.delete(request)
client.removeListener('close', closeSocketServer)
})

client.once('close', closeSocketServer)
}),
(reason: Error) => {
log.error('[email protected]: rejecting %j, reason: %j', request, inspect(reason))
clientLog.error('[email protected]: rejecting %j, reason: %j', request, inspect(reason))
reject?.()
}
)
})
.on('error', err => {
log.error('client error: %j', inspect(err))
clientLog.error('client error: %j', inspect(err))
preevySshClient?.emit('error', err)
})
.on('close', () => {
clientLog.debug('client close')
serverEmitter?.emit('close')
})
.on('session', accept => {
log.debug('session')
clientLog.debug('session')
const session = accept()

session.on('exec', async (acceptExec, rejectExec, info) => {
log.debug('exec %j', info)
clientLog.debug('exec %j', info)
if (info.command !== 'hello') {
log.error('invalid exec command %j', info.command)
clientLog.error('invalid exec command %j', info.command)
rejectExec()
return
}
Expand Down

0 comments on commit ef3bb10

Please sign in to comment.