Skip to content

Commit

Permalink
Improve raw update (#7160)
Browse files Browse the repository at this point in the history
Signed-off-by: Denis Bykhov <[email protected]>
  • Loading branch information
BykhovDenis authored Nov 12, 2024
1 parent 8bec20e commit 1afe8ef
Showing 1 changed file with 82 additions and 33 deletions.
115 changes: 82 additions & 33 deletions server/postgres/src/storage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -296,46 +296,95 @@ abstract class PostgresAdapterBase implements DbAdapter {
if ((operations as any).$set !== undefined) {
;(operations as any) = { ...(operations as any).$set }
}
const isOps = isOperator(operations)
if ((operations as any)['%hash%'] === undefined) {
;(operations as any)['%hash%'] = null
}
const conn = await this.client.reserve()
try {
await this.retryTxn(conn, async (client) => {
const res = await client.unsafe(`SELECT * FROM ${translateDomain(domain)} WHERE ${translatedQuery} FOR UPDATE`)
const docs = res.map((p) => parseDoc(p as any))
for (const doc of docs) {
if (doc === undefined) continue
const prevAttachedTo = (doc as any).attachedTo
TxProcessor.applyUpdate(doc, operations)
const converted = convertDoc(domain, doc, this.workspaceId.name)
let paramsIndex = 3
const params: any[] = [doc._id, this.workspaceId.name]
const updates: string[] = []
const { extractedFields, remainingData } = parseUpdate(domain, operations)
const newAttachedTo = (doc as any).attachedTo
if (Object.keys(extractedFields).length > 0) {
for (const key in extractedFields) {
const val = (extractedFields as any)[key]
if (key === 'attachedTo' && val === prevAttachedTo) continue
updates.push(`"${key}" = $${paramsIndex++}`)
params.push(val)
if (isOps) {
const conn = await this.client.reserve()
try {
await this.retryTxn(conn, async (client) => {
const res = await client.unsafe(
`SELECT * FROM ${translateDomain(domain)} WHERE ${translatedQuery} FOR UPDATE`
)
const docs = res.map((p) => parseDoc(p as any))
for (const doc of docs) {
if (doc === undefined) continue
const prevAttachedTo = (doc as any).attachedTo
TxProcessor.applyUpdate(doc, operations)
const converted = convertDoc(domain, doc, this.workspaceId.name)
let paramsIndex = 3
const params: any[] = [doc._id, this.workspaceId.name]
const updates: string[] = []
const { extractedFields, remainingData } = parseUpdate(domain, operations)
const newAttachedTo = (doc as any).attachedTo
if (Object.keys(extractedFields).length > 0) {
for (const key in extractedFields) {
const val = (extractedFields as any)[key]
if (key === 'attachedTo' && val === prevAttachedTo) continue
updates.push(`"${key}" = $${paramsIndex++}`)
params.push(val)
}
} else if (prevAttachedTo !== undefined && prevAttachedTo !== newAttachedTo) {
updates.push(`"attachedTo" = $${paramsIndex++}`)
params.push(newAttachedTo)
}
} else if (prevAttachedTo !== undefined && prevAttachedTo !== newAttachedTo) {
updates.push(`"attachedTo" = $${paramsIndex++}`)
params.push(newAttachedTo)
}

if (Object.keys(remainingData).length > 0) {
updates.push(`data = $${paramsIndex++}`)
params.push(converted.data)
if (Object.keys(remainingData).length > 0) {
updates.push(`data = $${paramsIndex++}`)
params.push(converted.data)
}
await client.unsafe(
`UPDATE ${translateDomain(domain)} SET ${updates.join(', ')} WHERE _id = $1 AND "workspaceId" = $2`,
params
)
}
await client.unsafe(
`UPDATE ${translateDomain(domain)} SET ${updates.join(', ')} WHERE _id = $1 AND "workspaceId" = $2`,
params
)
}
})
} finally {
conn.release()
}
} else {
await this.rawUpdateDoc(domain, query, operations)
}
}

private async rawUpdateDoc<T extends Doc>(
domain: Domain,
query: DocumentQuery<T>,
operations: DocumentUpdate<T>
): Promise<void> {
const translatedQuery = this.buildRawQuery(domain, query)
const updates: string[] = []
const params: any[] = []
let paramsIndex = 5
const { extractedFields, remainingData } = parseUpdate(domain, operations)
const { space, attachedTo, ...ops } = operations as any
for (const key in extractedFields) {
updates.push(`"${key}" = $${paramsIndex++}`)
params.push((extractedFields as any)[key])
}
let from = 'data'
let dataUpdated = false
for (const key in remainingData) {
if (ops[key] === undefined) continue
const val = (remainingData as any)[key]
from = `jsonb_set(${from}, '{${key}}', coalesce(to_jsonb($${paramsIndex++}${inferType(val)}), 'null') , true)`
params.push(val)
dataUpdated = true
}
if (dataUpdated) {
updates.push(`data = ${from}`)
}
const conn = await this.client.reserve()
try {
await this.retryTxn(conn, async (client) => {
await client.unsafe(
`UPDATE ${translateDomain(domain)} SET ${updates.join(', ')} WHERE ${translatedQuery}`,
params
)
})
} catch (err) {
console.error(err, { domain, params, updates })
} finally {
conn.release()
}
Expand Down

0 comments on commit 1afe8ef

Please sign in to comment.