Skip to content

Commit

Permalink
Add back in main isDeleted code
Browse files Browse the repository at this point in the history
  • Loading branch information
bankisan committed Dec 12, 2023
1 parent f143489 commit 5bb2ca1
Showing 1 changed file with 24 additions and 33 deletions.
57 changes: 24 additions & 33 deletions packages/core/src/indexing-store/postgres/store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -771,48 +771,39 @@ export class PostgresIndexingStore implements IndexingStore {
encodeBigInts: false,
});
const encodedCheckpoint = encodeCheckpoint(checkpoint);
const isDeleted = await this.db.transaction().execute(async (tx) => {
// If the latest version has effectiveFromCheckpoint equal to current checkpoint,
// this row was created within the same indexing function, and we can delete it.
let deletedRow = await tx
.deleteFrom(table)
.where(
"id",
this.idColumnComparator({ tableName, schema: this.schema }),
formattedId,
)
.where("effectiveFromCheckpoint", "=", encodedCheckpoint)
.where("effectiveToCheckpoint", "=", "latest")
.returning(["id"])
.executeTakeFirst();

const isDeleted = await this.writerDB
.transaction()
.execute(async (tx) => {
// If the latest version has effectiveFromCheckpoint equal to current checkpoint,
// this row was created within the same indexing function, and we can delete it.
let deletedRow = await tx
.deleteFrom(table)
// If we did not take the shortcut above, update the latest record
// setting effectiveToCheckpoint to the current checkpoint.
if (!deletedRow) {
deletedRow = await tx
.updateTable(table)
.set({ effectiveToCheckpoint: encodedCheckpoint })
.where(
"id",
this.idColumnComparator({
tableName,
schema: this.schema,
}),
this.idColumnComparator({ tableName, schema: this.schema }),
formattedId,
)
.where("effectiveFromCheckpoint", "=", encodedCheckpoint)
.where("effectiveToCheckpoint", "=", "latest")
.returning(["id"])
.executeTakeFirst();
}

// If we did not take the shortcut above, update the latest record
// setting effectiveToCheckpoint to the current checkpoint.
if (!deletedRow) {
deletedRow = await tx
.updateTable(table)
.set({ effectiveToCheckpoint: encodedCheckpoint })
.where(
"id",
this.idColumnComparator({
tableName,
schema: this.schema,
}),
formattedId,
)
.where("effectiveToCheckpoint", "=", "latest")
.returning(["id"])
.executeTakeFirst();
}

return !!deletedRow;
});
return !!deletedRow;
});

return isDeleted;
});
Expand Down

0 comments on commit 5bb2ca1

Please sign in to comment.