Skip to content

Commit

Permalink
Merge branch 'update-next' into next
Browse files Browse the repository at this point in the history
  • Loading branch information
jsumners committed Jul 16, 2024
2 parents b9f586c + 49a525d commit 1e44a5f
Show file tree
Hide file tree
Showing 9 changed files with 101 additions and 22 deletions.
6 changes: 4 additions & 2 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ const { unlink } = require('node:fs/promises')
const path = require('node:path')
const { generateId } = require('./lib/generateId')
const createError = require('@fastify/error')
const { sendToWormhole } = require('stream-wormhole')
const streamToNull = require('./lib/stream-consumer')
const deepmergeAll = require('@fastify/deepmerge')({ all: true })
const { PassThrough, Readable } = require('node:stream')
const { pipeline: pump } = require('node:stream/promises')
Expand Down Expand Up @@ -141,6 +141,7 @@ function fastifyMultipart (fastify, options, done) {
}

async function append (key, entry) {
/* c8 ignore next: Buffer.isBuffer is not covered and causing `npm test` to fail */
if (entry.type === 'file' || (attachFieldsToBody === 'keyValues' && Buffer.isBuffer(entry))) {
// TODO use File constructor with fs.openAsBlob()
// if attachFieldsToBody is not set
Expand All @@ -161,6 +162,7 @@ function fastifyMultipart (fastify, options, done) {
/* istanbul ignore next */
if (!fastify.hasRequestDecorator('formData')) {
fastify.decorateRequest('formData', async function () {
/* c8 ignore next: Next line is not covered and causing `npm test` to fail */
throw new NoFormData()
})
}
Expand Down Expand Up @@ -347,7 +349,7 @@ function fastifyMultipart (fastify, options, done) {
// don't overwrite prototypes
if (name in Object.prototype) {
// ensure that stream is consumed, any error is suppressed
sendToWormhole(file)
streamToNull(file)
onError(new PrototypeViolationError())
return
}
Expand Down
18 changes: 18 additions & 0 deletions lib/stream-consumer.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
'use strict'

module.exports = function streamToNull (stream) {
return new Promise((resolve, reject) => {
stream.on('data', () => {
/* The stream needs a data reader or else it will never end. */
})
stream.on('close', () => {
resolve()
})
stream.on('end', () => {
resolve()
})
stream.on('error', (error) => {
reject(error)
})
})
}
3 changes: 1 addition & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@
"@fastify/deepmerge": "^2.0.0",
"@fastify/error": "^4.0.0",
"fastify-plugin": "^5.0.0-pre.fv5.1",
"secure-json-parse": "^2.7.0",
"stream-wormhole": "^2.0.1"
"secure-json-parse": "^2.7.0"
},
"devDependencies": {
"@fastify/pre-commit": "^2.1.0",
Expand Down
9 changes: 5 additions & 4 deletions test/big.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ const stream = require('readable-stream')
const Readable = stream.Readable
const pump = stream.pipeline
const crypto = require('node:crypto')
const sendToWormhole = require('stream-wormhole')
const streamToNull = require('../lib/stream-consumer')

// skipping on Github Actions because it takes too long
test('should upload a big file in constant memory', { skip: process.env.CI }, function (t) {
Expand Down Expand Up @@ -38,7 +38,7 @@ test('should upload a big file in constant memory', { skip: process.env.CI }, fu
t.equal(part.encoding, '7bit')
t.equal(part.mimetype, 'binary/octet-stream')

await sendToWormhole(part.file)
await streamToNull(part.file)
}
}

Expand Down Expand Up @@ -78,10 +78,11 @@ test('should upload a big file in constant memory', { skip: process.env.CI }, fu
knownLength
})

const addresses = fastify.addresses()
const opts = {
protocol: 'http:',
hostname: 'localhost',
port: fastify.server.address().port,
hostname: addresses[0].address,
port: addresses[0].port,
path: '/',
headers: form.getHeaders(),
method: 'POST'
Expand Down
4 changes: 2 additions & 2 deletions test/multipart-big-stream.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ const multipart = require('..')
const http = require('node:http')
const crypto = require('node:crypto')
const { Readable } = require('readable-stream')
const { sendToWormhole } = require('stream-wormhole')
const streamToNull = require('../lib/stream-consumer')
const EventEmitter = require('node:events')
const { once } = EventEmitter

Expand All @@ -23,7 +23,7 @@ test('should emit fileSize limitation error during streaming', async function (t
fastify.post('/', async function (req, reply) {
t.ok(req.isMultipart())
const part = await req.file({ limits: { fileSize: 16500 } })
await sendToWormhole(part.file)
await streamToNull(part.file)
if (part.file.truncated) {
reply.code(500).send()
} else {
Expand Down
4 changes: 2 additions & 2 deletions test/multipart-http2.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ const multipart = require('..')
const h2url = require('h2url')
const path = require('node:path')
const fs = require('node:fs')
const { sendToWormhole } = require('stream-wormhole')
const streamToNull = require('../lib/stream-consumer')

const filePath = path.join(__dirname, '../README.md')

Expand All @@ -21,7 +21,7 @@ test('should respond when all files are processed', function (t) {
const parts = req.files()
for await (const part of parts) {
t.ok(part.file)
await sendToWormhole(part.file)
await streamToNull(part.file)
}
reply.code(200).send()
})
Expand Down
6 changes: 3 additions & 3 deletions test/multipart-small-stream.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ const http = require('node:http')
const path = require('node:path')
const fs = require('node:fs')
const EventEmitter = require('node:events')
const { sendToWormhole } = require('stream-wormhole')
const streamToNull = require('../lib/stream-consumer')
const { once } = EventEmitter

const filePath = path.join(__dirname, '../README.md')
Expand All @@ -26,7 +26,7 @@ test('should throw fileSize limitation error on small payload', { skip: true },
t.ok(req.isMultipart())

const part = await req.file({ limits: { fileSize: 2 } })
await sendToWormhole(part.file)
await streamToNull(part.file)

reply.code(200).send()
})
Expand Down Expand Up @@ -71,7 +71,7 @@ test('should not throw and error when throwFileSizeLimit option is false', { ski
t.ok(req.isMultipart())

const part = await req.file({ limits: { fileSize: 2 }, throwFileSizeLimit: false })
await sendToWormhole(part.file)
await streamToNull(part.file)

reply.code(200).send()
})
Expand Down
14 changes: 7 additions & 7 deletions test/multipart.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ const concat = require('concat-stream')
const stream = require('node:stream')
const { once } = require('node:events')
const pump = util.promisify(stream.pipeline)
const { sendToWormhole } = require('stream-wormhole')
const streamToNull = require('../lib/stream-consumer')

const filePath = path.join(__dirname, '../README.md')

Expand Down Expand Up @@ -89,7 +89,7 @@ test('should respond when all files are processed', function (t) {
for await (const part of parts) {
t.ok(part.file)
t.equal(part.type, 'file')
await sendToWormhole(part.file)
await streamToNull(part.file)
}
reply.code(200).send()
})
Expand Down Expand Up @@ -141,7 +141,7 @@ test('should group parts with the same name to an array', function (t) {
t.pass('multiple files are grouped by array')
}
if (part.file) {
await sendToWormhole(part.file)
await streamToNull(part.file)
}
}
reply.code(200).send()
Expand Down Expand Up @@ -270,7 +270,7 @@ test('should throw error due to filesLimit (The max number of file fields (Defau
const parts = req.files({ limits: { files: 1 } })
for await (const part of parts) {
t.ok(part.file, 'part received')
await sendToWormhole(part.file)
await streamToNull(part.file)
}
reply.code(200).send()
} catch (error) {
Expand Down Expand Up @@ -330,7 +330,7 @@ test('should be able to configure limits globally with plugin register options',
for await (const part of parts) {
t.ok(part.file)
t.equal(part.type, 'file')
await sendToWormhole(part.file)
await streamToNull(part.file)
}
reply.code(200).send()
} catch (error) {
Expand Down Expand Up @@ -485,7 +485,7 @@ test('should throw error due to file size limit exceed (Default: true)', functio
for await (const part of parts) {
t.ok(part.file)
t.equal(part.type, 'file')
await sendToWormhole(part.file)
await streamToNull(part.file)
}
reply.code(200).send()
} catch (error) {
Expand Down Expand Up @@ -532,7 +532,7 @@ test('should not throw error due to file size limit exceed - files setting (Defa
for await (const part of parts) {
t.ok(part.file)
t.equal(part.type, 'file')
await sendToWormhole(part.file)
await streamToNull(part.file)
}
reply.code(200).send()
})
Expand Down
59 changes: 59 additions & 0 deletions test/stream-consumer.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
'use strict'

const tap = require('tap')
const { Readable } = require('node:stream')
const streamToNull = require('../lib/stream-consumer')

tap.test('does what it should', async t => {
let count = 1_000_000
const stream = new Readable({
read () {
if (count === 0) {
this.push(null)
return
}
count -= 1
this.push(Buffer.from('1'))
}
})

await streamToNull(stream)
t.pass()
})

tap.test('handles close event', async t => {
let count = 1_000_000
const stream = new Readable({
read () {
if (count === 50_000) {
this.destroy()
return
}
count -= 1
this.push(Buffer.from('1'))
}
})

await streamToNull(stream)
t.pass()
})

tap.test('handles error event', async t => {
let count = 1_000_000
const stream = new Readable({
read () {
if (count === 50_000) {
this.destroy(Error('boom'))
return
}
count -= 1
this.push(Buffer.from('1'))
}
})

try {
await streamToNull(stream)
} catch (error) {
t.match(error, /boom/)
}
})

0 comments on commit 1e44a5f

Please sign in to comment.