Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Disable processing hot blocks & Add DataObjectDeleted event #9

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
# 1.3.0

- Disables processing of Hot/Un-finalized blocks by the processor.
- Add `DataObjectDeletedEventData` event type.

# 1.2.0

- patched `@subsquid/openreader` to pass parameter values in 'queryConfig' object instead of passing as param to `pgClient.query` function.
Expand Down
2 changes: 0 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,7 @@ ADD joystream.jsonl .
ADD src src
ADD schema schema
ADD scripts scripts
RUN npx squid-typeorm-codegen
RUN npx squid-substrate-typegen typegen.json
RUN npm run generate:schema || true
RUN npx squid-typeorm-codegen
RUN npm run build

Expand Down
13 changes: 9 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,15 @@ dbgen:

generate-migrations:
@rm db/migrations/*-Data.js || true
@docker-compose down -v
@docker network create joystream_default || true
@docker-compose up -d squid_db
@npx squid-typeorm-migration generate
@docker run -d --name temp_migrations_db \
-e POSTGRES_DB=squid \
-e POSTGRES_HOST_AUTH_METHOD=trust \
-v temp_migrations_db_volume:/var/lib/postgresql/data \
-v ./db/postgres.conf:/etc/postgresql/postgresql.conf \
-p 5555:5555 postgres:14 postgres -p 5555 || true
@export DB_PORT=5555 && sleep 5 && npx squid-typeorm-migration generate
@docker rm temp_migrations_db -vf || true
@docker volume rm temp_migrations_db_volume || true

codegen:
@npm run generate:schema || true
Expand Down
13 changes: 13 additions & 0 deletions db/migrations/2100000000000-Indexes.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
module.exports = class Indexes2000000000000 {
name = 'Indexes2000000000000'

async up(db) {
await db.query(`CREATE INDEX "events_type" ON "event" USING BTREE (("data"->>'isTypeOf'));`)
await db.query(`CREATE INDEX "events_dataObjectId" ON "event" USING BTREE (("data"->>'dataObjectId'));`)
}

async down(db) {
await db.query(`DROP INDEX "events_type";`)
await db.query(`DROP INDEX "events_dataObjectId";`)
}
}
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "storage-squid",
"version": "1.2.0",
"version": "1.3.0",
"engines": {
"node": ">=16"
},
Expand Down
7 changes: 6 additions & 1 deletion schema/events.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type Event @entity {
data: EventData!
}

union EventData = MetaprotocolTransactionStatusEventData
union EventData = MetaprotocolTransactionStatusEventData | DataObjectDeletedEventData

type MetaprotocolTransactionResultOK {
phantom: Int
Expand All @@ -36,3 +36,8 @@ type MetaprotocolTransactionStatusEventData {
"The result of metaprotocol action"
result: MetaprotocolTransactionResult!
}

type DataObjectDeletedEventData {
"Runtime ID of deleted the deleted object"
dataObjectId: ID!
}
2 changes: 1 addition & 1 deletion scripts/get-graphql-schema.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ npx squid-graphql-server &
# Wait for 5 seconds to allow the server to start
sleep 5

# Get the GraphQL schema and output it to a file
# Get the GraphQL schema and output it
./node_modules/get-graphql-schema/dist/index.js http://localhost:${GQL_PORT}/graphql

# Find the PID of the squid-graphql-server
Expand Down
14 changes: 11 additions & 3 deletions src/mappings/storage/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,9 @@ export async function processDynamicBagCreatedEvent({

export async function processDynamicBagDeletedEvent({
overlay,
block,
indexInBlock,
extrinsicHash,
event,
eventDecoder,
}: EventHandlerContext<'Storage.DynamicBagDeleted'>): Promise<void> {
Expand All @@ -255,7 +258,7 @@ export async function processDynamicBagDeletedEvent({
.getManyByRelation('storageBagId', dynBagId)
overlay.getRepository(StorageBucketBag).remove(...bagStorageBucketRelations)
overlay.getRepository(DistributionBucketBag).remove(...bagDistributionBucketRelations)
await deleteDataObjects(overlay, objects)
await deleteDataObjects(overlay, block, indexInBlock, extrinsicHash, objects)
overlay.getRepository(StorageBag).remove(dynBagId)
}

Expand Down Expand Up @@ -284,6 +287,8 @@ export async function processDataObjectsUploadedEvent({
export async function processDataObjectsUpdatedEvent({
overlay,
block,
indexInBlock,
extrinsicHash,
event,
eventDecoder,
}: EventHandlerContext<'Storage.DataObjectsUpdated'>): Promise<void> {
Expand All @@ -302,7 +307,7 @@ export async function processDataObjectsUpdatedEvent({
stateBloatBond,
uploadedObjectIds
)
await deleteDataObjectsByIds(overlay, objectsToRemoveIds)
await deleteDataObjectsByIds(overlay, block, indexInBlock, extrinsicHash, objectsToRemoveIds)
}

export async function processPendingDataObjectsAcceptedEvent({
Expand Down Expand Up @@ -339,11 +344,14 @@ export async function processDataObjectsMovedEvent({

export async function processDataObjectsDeletedEvent({
overlay,
block,
indexInBlock,
extrinsicHash,
event,
eventDecoder,
}: EventHandlerContext<'Storage.DataObjectsDeleted'>): Promise<void> {
const [, , dataObjectIds] = eventDecoder.v1000.decode(event)
await deleteDataObjectsByIds(overlay, dataObjectIds)
await deleteDataObjectsByIds(overlay, block, indexInBlock, extrinsicHash, dataObjectIds)
}

// DISTRIBUTION FAMILY EVENTS
Expand Down
29 changes: 26 additions & 3 deletions src/mappings/storage/utils.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import { hexToString } from '@polkadot/util'
import {
DataObjectDeletedEventData,
DistributionBucketOperator,
DistributionBucketOperatorMetadata,
Event,
StorageBag,
StorageBagOwner,
StorageBagOwnerChannel,
Expand All @@ -21,6 +23,7 @@ import {
} from '../../types/v1000'
import { criticalError } from '../../utils/misc'
import { EntityManagerOverlay, Flat, RepositoryOverlay } from '../../utils/overlay'
import { genericEventFields } from '../utils'

export function getDynamicBagId(bagId: DynamicBagIdType): string {
if (bagId.__kind === 'Channel') {
Expand Down Expand Up @@ -164,12 +167,32 @@ export async function getOrCreateBag(

export async function deleteDataObjects(
overlay: EntityManagerOverlay,
block: Block,
indexInBlock: number,
extrinsicHash: string | undefined,
objects: Flat<StorageDataObject>[]
) {
overlay.getRepository(StorageDataObject).remove(...objects)
for (const object of objects) {
// Add event for data object deletion
overlay.getRepository(Event).new({
...genericEventFields(overlay, block, indexInBlock, extrinsicHash),
data: new DataObjectDeletedEventData({
dataObjectId: object.id,
}),
})

// Remove data object
overlay.getRepository(StorageDataObject).remove(object)
}
}

export async function deleteDataObjectsByIds(overlay: EntityManagerOverlay, ids: bigint[]) {
export async function deleteDataObjectsByIds(
overlay: EntityManagerOverlay,
block: Block,
indexInBlock: number,
extrinsicHash: string | undefined,
ids: bigint[]
) {
const dataObjectRepository = overlay.getRepository(StorageDataObject)
const subtitlesRepository = overlay.getRepository(VideoSubtitle)
const objects = await Promise.all(
Expand All @@ -181,5 +204,5 @@ export async function deleteDataObjectsByIds(overlay: EntityManagerOverlay, ids:
)

subtitlesRepository.remove(...currentSubtitles.flat())
await deleteDataObjects(overlay, objects)
await deleteDataObjects(overlay, block, indexInBlock, extrinsicHash, objects)
}
57 changes: 30 additions & 27 deletions src/processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -153,35 +153,38 @@ async function processEvent<EventName extends EventNames>(
await eventHandler({ block, overlay, event, eventDecoder, indexInBlock, extrinsicHash })
}

PROCESSOR.run(new TypeormDatabase({ isolationLevel: 'READ COMMITTED' }), async (ctx) => {
Logger.set(ctx.log)

const overlay = await EntityManagerOverlay.create(ctx.store)

for (const block of ctx.blocks) {
for (const event of block.events) {
if (event.name !== '*') {
ctx.log.info(`Processing ${event.name} event in block ${block.header.height}...`)

await processEvent(
event.name as EventNames,
block.header,
event.index,
event.extrinsic?.hash,
event,
overlay
)
// Update database if the number of cached entities exceeded MAX_CACHED_ENTITIES
if (overlay.totalCacheSize() > maxCachedEntities) {
ctx.log.info(
`Max memory cache size of ${maxCachedEntities} exceeded, updating database...`
PROCESSOR.run(
new TypeormDatabase({ isolationLevel: 'READ COMMITTED', supportHotBlocks: false }),
async (ctx) => {
Logger.set(ctx.log)

const overlay = await EntityManagerOverlay.create(ctx.store)

for (const block of ctx.blocks) {
for (const event of block.events) {
if (event.name !== '*') {
ctx.log.info(`Processing ${event.name} event in block ${block.header.height}...`)

await processEvent(
event.name as EventNames,
block.header,
event.index,
event.extrinsic?.hash,
event,
overlay
)
await overlay.updateDatabase()
// Update database if the number of cached entities exceeded MAX_CACHED_ENTITIES
if (overlay.totalCacheSize() > maxCachedEntities) {
ctx.log.info(
`Max memory cache size of ${maxCachedEntities} exceeded, updating database...`
)
await overlay.updateDatabase()
}
}
}
}
}

ctx.log.info(`Saving database updates...`)
await overlay.updateDatabase()
})
ctx.log.info(`Saving database updates...`)
await overlay.updateDatabase()
}
)
Loading