Skip to content

Commit

Permalink
feat: s3 post handler (#594)
Browse files Browse the repository at this point in the history
  • Loading branch information
fenos authored Dec 31, 2024
1 parent f45ccaf commit d6db70e
Show file tree
Hide file tree
Showing 17 changed files with 1,353 additions and 942 deletions.
1,405 changes: 745 additions & 660 deletions package-lock.json

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
"xml2js": "^0.6.2"
},
"devDependencies": {
"@aws-sdk/s3-presigned-post": "3.654.0",
"@types/async-retry": "^1.4.5",
"@types/busboy": "^1.3.0",
"@types/crypto-js": "^4.1.1",
Expand Down
33 changes: 31 additions & 2 deletions src/http/plugins/signature-v4.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { signJWT, verifyJWT } from '@internal/auth'
import { ERRORS } from '@internal/errors'

import { getConfig } from '../../config'
import { MultipartFile } from '@fastify/multipart'

const {
anonKey,
Expand All @@ -25,10 +26,16 @@ const {

type AWSRequest = FastifyRequest<{ Querystring: { 'X-Amz-Credential'?: string } }>

declare module 'fastify' {
interface FastifyRequest {
multiPartFileStream?: MultipartFile
}
}

export const signatureV4 = fastifyPlugin(
async function (fastify: FastifyInstance) {
fastify.addHook('preHandler', async (request: AWSRequest) => {
const clientSignature = extractSignature(request)
const clientSignature = await extractSignature(request)

const sessionToken = clientSignature.sessionToken

Expand Down Expand Up @@ -101,7 +108,7 @@ export const signatureV4 = fastifyPlugin(
{ name: 'auth-signature-v4' }
)

function extractSignature(req: AWSRequest) {
async function extractSignature(req: AWSRequest) {
if (typeof req.headers.authorization === 'string') {
return SignatureV4.parseAuthorizationHeader(req.headers)
}
Expand All @@ -110,6 +117,28 @@ function extractSignature(req: AWSRequest) {
return SignatureV4.parseQuerySignature(req.query)
}

if (typeof req.isMultipart === 'function' && req.isMultipart()) {
const formData = new FormData()
const data = await req.file({
limits: {
fields: 20,
files: 1,
},
})

const fields = data?.fields
if (fields) {
for (const key in fields) {
if (fields.hasOwnProperty(key) && (fields[key] as any).fieldname !== 'file') {
formData.append(key, (fields[key] as any).value)
}
}
}
// Assign the multipartFileStream for later use
req.multiPartFileStream = data
return SignatureV4.parseMultipartSignature(formData)
}

throw ERRORS.AccessDenied('Missing signature')
}

Expand Down
9 changes: 6 additions & 3 deletions src/http/plugins/xml.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import xml from 'xml2js'
// @ts-ignore
import xmlBodyParser from 'fastify-xml-body-parser'

export const jsonToXml = fastifyPlugin(
export const xmlParser = fastifyPlugin(
async function (
fastify: FastifyInstance,
opts: { disableContentParser?: boolean; parseAsArray?: string[] }
Expand All @@ -17,16 +17,19 @@ export const jsonToXml = fastifyPlugin(

if (!opts.disableContentParser) {
fastify.register(xmlBodyParser, {
contentType: ['text/xml', 'application/xml', '*'],
contentType: ['text/xml', 'application/xml'],
isArray: (_: string, jpath: string) => {
return opts.parseAsArray?.includes(jpath)
},
})
}

fastify.addHook('preSerialization', async (req, res, payload) => {
const accept = req.accepts()

if (accept.types(['application/xml', 'application/json']) === 'application/xml') {
const acceptedTypes = ['application/xml', 'text/html']

if (acceptedTypes.some((allowed) => accept.types(acceptedTypes) === allowed)) {
res.serializer((payload) => payload)

const xmlBuilder = new xml.Builder({
Expand Down
8 changes: 4 additions & 4 deletions src/http/routes/object/createObject.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,15 +72,15 @@ export default async function routes(fastify: FastifyInstance) {
const objectName = request.params['*']

const isUpsert = request.headers['x-upsert'] === 'true'
const owner = request.owner as string
const owner = request.owner

const { objectMetadata, path, id } = await request.storage
.from(bucketName)
.uploadNewObject(request, {
.uploadFromRequest(request, {
objectName,
owner,
isUpsert,
signal: request.signals.body.signal,
owner: owner,
isUpsert,
})

return response.status(objectMetadata?.httpStatusCode ?? 200).send({
Expand Down
8 changes: 5 additions & 3 deletions src/http/routes/object/updateObject.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { FromSchema } from 'json-schema-to-ts'
import { createDefaultSchema } from '../../routes-helper'
import { ROUTE_OPERATIONS } from '../operations'
import fastifyMultipart from '@fastify/multipart'
import { fileUploadFromRequest } from '@storage/uploader'

const updateObjectParamsSchema = {
type: 'object',
Expand Down Expand Up @@ -74,10 +75,11 @@ export default async function routes(fastify: FastifyInstance) {

const { objectMetadata, path, id } = await request.storage
.from(bucketName)
.uploadOverridingObject(request, {
owner,
objectName: objectName,
.uploadFromRequest(request, {
objectName,
signal: request.signals.body.signal,
owner: owner,
isUpsert: true,
})

return response.status(objectMetadata?.httpStatusCode ?? 200).send({
Expand Down
2 changes: 1 addition & 1 deletion src/http/routes/object/uploadSignedObject.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ export default async function routes(fastify: FastifyInstance) {
const { objectMetadata, path } = await request.storage
.asSuperUser()
.from(bucketName)
.uploadNewObject(request, {
.uploadFromRequest(request, {
owner,
objectName,
isUpsert: upsert,
Expand Down
157 changes: 157 additions & 0 deletions src/http/routes/s3/commands/put-object.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
import { S3ProtocolHandler } from '@storage/protocols/s3/s3-handler'
import { S3Router } from '../router'
import { ROUTE_OPERATIONS } from '../../operations'
import { Multipart, MultipartValue } from '@fastify/multipart'
import { fileUploadFromRequest, getStandardMaxFileSizeLimit } from '@storage/uploader'
import { ERRORS } from '@internal/errors'
import { pipeline } from 'stream/promises'
import { ByteLimitTransformStream } from '@storage/protocols/s3/byte-limit-stream'
import stream from 'stream'

const PutObjectInput = {
summary: 'Put Object',
Params: {
type: 'object',
properties: {
Bucket: { type: 'string' },
'*': { type: 'string' },
},
required: ['Bucket', '*'],
},
Querystring: {
type: 'object',
},
Headers: {
type: 'object',
properties: {
authorization: { type: 'string' },
host: { type: 'string' },
'x-amz-content-sha256': { type: 'string' },
'x-amz-date': { type: 'string' },
'content-type': { type: 'string' },
'content-length': { type: 'integer' },
'cache-control': { type: 'string' },
'content-disposition': { type: 'string' },
'content-encoding': { type: 'string' },
expires: { type: 'string' },
},
required: ['content-length'],
},
} as const

const PostFormInput = {
summary: 'PostForm Object',
Params: {
type: 'object',
properties: {
Bucket: { type: 'string' },
},
required: ['Bucket'],
},
} as const

export default function PutObject(s3Router: S3Router) {
s3Router.put(
'/:Bucket/*',
{
schema: PutObjectInput,
operation: ROUTE_OPERATIONS.S3_UPLOAD,
disableContentTypeParser: true,
},
async (req, ctx) => {
const s3Protocol = new S3ProtocolHandler(ctx.storage, ctx.tenantId, ctx.owner)

const metadata = s3Protocol.parseMetadataHeaders(req.Headers)
const contentLength = req.Headers['content-length']
let key = req.Params['*']

if (key.endsWith('/') && contentLength === 0) {
// Consistent with how supabase Storage handles empty folders
key += '.emptyFolderPlaceholder'
}

const bucket = await ctx.storage
.asSuperUser()
.findBucket(req.Params.Bucket, 'id,file_size_limit,allowed_mime_types')

const uploadRequest = await fileUploadFromRequest(ctx.req, {
objectName: key,
allowedMimeTypes: bucket.allowed_mime_types || [],
fileSizeLimit: bucket.file_size_limit || undefined,
})

return s3Protocol.putObject(
{
Body: uploadRequest.body,
Bucket: req.Params.Bucket,
Key: key,
CacheControl: uploadRequest.cacheControl,
ContentType: uploadRequest.mimeType,
Expires: req.Headers?.['expires'] ? new Date(req.Headers?.['expires']) : undefined,
ContentEncoding: req.Headers?.['content-encoding'],
Metadata: metadata,
},
{ signal: ctx.signals.body, isTruncated: uploadRequest.isTruncated }
)
}
)

s3Router.post(
'/:Bucket|content-type=multipart/form-data',
{
schema: PostFormInput,
operation: ROUTE_OPERATIONS.S3_UPLOAD,
acceptMultiformData: true,
},
async (req, ctx) => {
const s3Protocol = new S3ProtocolHandler(ctx.storage, ctx.tenantId, ctx.owner)

const file = ctx.req.multiPartFileStream

if (!file) {
throw ERRORS.InvalidParameter('Missing file')
}

const bucket = await ctx.storage
.asSuperUser()
.findBucket(req.Params.Bucket, 'id,file_size_limit,allowed_mime_types')

const metadata = s3Protocol.parseMetadataHeaders(file?.fields || {})
const expiresField = normaliseFormDataField(file?.fields?.Expires) as string | undefined

const maxFileSize = await getStandardMaxFileSizeLimit(ctx.tenantId, bucket.file_size_limit)

return pipeline(file.file, new ByteLimitTransformStream(maxFileSize), async (fileStream) => {
return s3Protocol.putObject(
{
Body: fileStream as stream.Readable,
Bucket: req.Params.Bucket,
Key: normaliseFormDataField(file?.fields?.key) as string,
CacheControl: normaliseFormDataField(file?.fields?.['Cache-Control']) as string,
ContentType: normaliseFormDataField(file?.fields?.['Content-Type']) as string,
Expires: expiresField ? new Date(expiresField) : undefined,
ContentEncoding: normaliseFormDataField(file?.fields?.['Content-Encoding']) as string,
Metadata: metadata,
},
{ signal: ctx.signals.body, isTruncated: () => file.file.truncated }
)
})
}
)
}

function normaliseFormDataField(value: Multipart | Multipart[] | undefined) {
if (!value) {
return undefined
}

if (Array.isArray(value)) {
return (value[0] as MultipartValue).value as string
}

if (value.type === 'field') {
return value.value
}

return value.file
}
58 changes: 0 additions & 58 deletions src/http/routes/s3/commands/upload-part.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,36 +2,6 @@ import { S3ProtocolHandler } from '@storage/protocols/s3/s3-handler'
import { S3Router } from '../router'
import { ROUTE_OPERATIONS } from '../../operations'

const PutObjectInput = {
summary: 'Put Object',
Params: {
type: 'object',
properties: {
Bucket: { type: 'string' },
'*': { type: 'string' },
},
required: ['Bucket', '*'],
},
Querystring: {
type: 'object',
},
Headers: {
type: 'object',
properties: {
authorization: { type: 'string' },
host: { type: 'string' },
'x-amz-content-sha256': { type: 'string' },
'x-amz-date': { type: 'string' },
'content-type': { type: 'string' },
'content-length': { type: 'integer' },
'cache-control': { type: 'string' },
'content-disposition': { type: 'string' },
'content-encoding': { type: 'string' },
expires: { type: 'string' },
},
},
} as const

const UploadPartInput = {
summary: 'Upload Part',
Params: {
Expand Down Expand Up @@ -84,32 +54,4 @@ export default function UploadPart(s3Router: S3Router) {
})
}
)

s3Router.put(
'/:Bucket/*',
{
schema: PutObjectInput,
operation: ROUTE_OPERATIONS.S3_UPLOAD,
disableContentTypeParser: true,
},
(req, ctx) => {
const s3Protocol = new S3ProtocolHandler(ctx.storage, ctx.tenantId, ctx.owner)

const metadata = s3Protocol.parseMetadataHeaders(req.Headers)

return s3Protocol.putObject(
{
Body: ctx.req as any,
Bucket: req.Params.Bucket,
Key: req.Params['*'],
CacheControl: req.Headers?.['cache-control'],
ContentType: req.Headers?.['content-type'],
Expires: req.Headers?.['expires'] ? new Date(req.Headers?.['expires']) : undefined,
ContentEncoding: req.Headers?.['content-encoding'],
Metadata: metadata,
},
ctx.signals.body
)
}
)
}
Loading

0 comments on commit d6db70e

Please sign in to comment.