From 4c06ff65a470680b01c1ceb4253462ac7a888dd7 Mon Sep 17 00:00:00 2001 From: Adam Sherwood Date: Tue, 30 Jan 2024 17:08:03 +0000 Subject: [PATCH 1/2] adds iterate method for level, sqlite, indexdb --- src/storage/IndexedDbTupleStorage.ts | 77 +++++++++++++++-------- src/storage/LevelTupleStorage.ts | 17 +++++ src/storage/SQLiteTupleStorage.ts | 93 ++++++++++++++++------------ 3 files changed, 122 insertions(+), 65 deletions(-) diff --git a/src/storage/IndexedDbTupleStorage.ts b/src/storage/IndexedDbTupleStorage.ts index 22244eb..7780df7 100644 --- a/src/storage/IndexedDbTupleStorage.ts +++ b/src/storage/IndexedDbTupleStorage.ts @@ -7,6 +7,36 @@ const version = 1 const storeName = "tupledb" +function buildRange(args?: ScanStorageArgs): IDBKeyRange | null { + const lower = args?.gt || args?.gte + const lowerEq = Boolean(args?.gte) + + const upper = args?.lt || args?.lte + const upperEq = Boolean(args?.lte) + + let range: IDBKeyRange | null + if (upper) { + if (lower) { + range = IDBKeyRange.bound( + encodeTuple(lower), + encodeTuple(upper), + !lowerEq, + !upperEq + ) + } else { + range = IDBKeyRange.upperBound(encodeTuple(upper), !upperEq) + } + } else { + if (lower) { + range = IDBKeyRange.lowerBound(encodeTuple(lower), !lowerEq) + } else { + range = null + } + } + + return range +} + export class IndexedDbTupleStorage implements AsyncTupleStorageApi { private db: Promise> @@ -23,32 +53,7 @@ export class IndexedDbTupleStorage implements AsyncTupleStorageApi { const tx = db.transaction(storeName, "readonly") const index = tx.store // primary key - const lower = args?.gt || args?.gte - const lowerEq = Boolean(args?.gte) - - const upper = args?.lt || args?.lte - const upperEq = Boolean(args?.lte) - - let range: IDBKeyRange | null - if (upper) { - if (lower) { - range = IDBKeyRange.bound( - encodeTuple(lower), - encodeTuple(upper), - !lowerEq, - !upperEq - ) - } else { - range = IDBKeyRange.upperBound(encodeTuple(upper), !upperEq) - } - } else { - if (lower) { - range = IDBKeyRange.lowerBound(encodeTuple(lower), !lowerEq) - } else { - range = null - } - } - + const range = buildRange(args) const direction: IDBCursorDirection = args?.reverse ? "prev" : "next" const limit = args?.limit || Infinity @@ -65,6 +70,26 @@ export class IndexedDbTupleStorage implements AsyncTupleStorageApi { return results } + async *iterate(args?: ScanStorageArgs): AsyncGenerator { + const db = await this.db + const tx = db.transaction(storeName, "readonly") + const index = tx.store // primary key + + const range = buildRange(args) + const direction: IDBCursorDirection = args?.reverse ? "prev" : "next" + + const limit = args?.limit || Infinity + let results: KeyValuePair[] = [] + for await (const cursor of index.iterate(range, direction)) { + yield { + key: decodeTuple(cursor.key), + value: cursor.value, + } + if (results.length >= limit) break + } + await tx.done + } + async commit(writes: WriteOps) { const db = await this.db const tx = db.transaction(storeName, "readwrite") diff --git a/src/storage/LevelTupleStorage.ts b/src/storage/LevelTupleStorage.ts index 05c80f5..d4df8af 100644 --- a/src/storage/LevelTupleStorage.ts +++ b/src/storage/LevelTupleStorage.ts @@ -35,6 +35,23 @@ export class LevelTupleStorage implements AsyncTupleStorageApi { return results } + async *iterate(args: ScanStorageArgs = {}): AsyncGenerator { + const dbArgs: any = {} + if (args.gt !== undefined) dbArgs.gt = encodeTuple(args.gt) + if (args.gte !== undefined) dbArgs.gte = encodeTuple(args.gte) + if (args.lt !== undefined) dbArgs.lt = encodeTuple(args.lt) + if (args.lte !== undefined) dbArgs.lte = encodeTuple(args.lte) + if (args.limit !== undefined) dbArgs.limit = args.limit + if (args.reverse !== undefined) dbArgs.reverse = args.reverse + + for await (const [key, value] of this.db.iterator(dbArgs)) { + yield { + key: decodeTuple(key), + value: decodeValue(value), + } as KeyValuePair + } + } + async commit(writes: WriteOps): Promise { const ops = [ ...(writes.remove || []).map( diff --git a/src/storage/SQLiteTupleStorage.ts b/src/storage/SQLiteTupleStorage.ts index 425669c..46cc538 100644 --- a/src/storage/SQLiteTupleStorage.ts +++ b/src/storage/SQLiteTupleStorage.ts @@ -3,6 +3,48 @@ import { TupleStorageApi } from "../database/sync/types" import { decodeTuple, encodeTuple } from "../helpers/codec" import { KeyValuePair, ScanStorageArgs, Tuple, WriteOps } from "./types" +function buildQuery(args: ScanStorageArgs = {}) { + // Bounds. + let start = args.gte ? encodeTuple(args.gte) : undefined + let startAfter: string | undefined = args.gt + ? encodeTuple(args.gt) + : undefined + let end: string | undefined = args.lte ? encodeTuple(args.lte) : undefined + let endBefore: string | undefined = args.lt ? encodeTuple(args.lt) : undefined + + const sqlArgs = { + start, + startAfter, + end, + endBefore, + limit: args.limit, + } + + const where = [ + start ? "key >= $start" : undefined, + startAfter ? "key > $startAfter" : undefined, + end ? "key <= $end" : undefined, + endBefore ? "key < $endBefore" : undefined, + ] + .filter(Boolean) + .join(" and ") + + let sqlQuery = `select * from data` + if (where) { + sqlQuery += " where " + sqlQuery += where + } + sqlQuery += " order by key" + if (args.reverse) { + sqlQuery += " desc" + } + if (args.limit) { + sqlQuery += ` limit $limit` + } + + return { sqlArgs, sqlQuery } +} + export class SQLiteTupleStorage implements TupleStorageApi { /** * import sqlite from "better-sqlite3" @@ -45,45 +87,7 @@ export class SQLiteTupleStorage implements TupleStorageApi { private writeFactsQuery: Transaction scan = (args: ScanStorageArgs = {}) => { - // Bounds. - let start = args.gte ? encodeTuple(args.gte) : undefined - let startAfter: string | undefined = args.gt - ? encodeTuple(args.gt) - : undefined - let end: string | undefined = args.lte ? encodeTuple(args.lte) : undefined - let endBefore: string | undefined = args.lt - ? encodeTuple(args.lt) - : undefined - - const sqlArgs = { - start, - startAfter, - end, - endBefore, - limit: args.limit, - } - - const where = [ - start ? "key >= $start" : undefined, - startAfter ? "key > $startAfter" : undefined, - end ? "key <= $end" : undefined, - endBefore ? "key < $endBefore" : undefined, - ] - .filter(Boolean) - .join(" and ") - - let sqlQuery = `select * from data` - if (where) { - sqlQuery += " where " - sqlQuery += where - } - sqlQuery += " order by key" - if (args.reverse) { - sqlQuery += " desc" - } - if (args.limit) { - sqlQuery += ` limit $limit` - } + const { sqlArgs, sqlQuery } = buildQuery(args) const results = this.db.prepare(sqlQuery).all(sqlArgs) @@ -94,6 +98,17 @@ export class SQLiteTupleStorage implements TupleStorageApi { value: JSON.parse(value), } as KeyValuePair) ) + }; + + *iterate(args: ScanStorageArgs = {}): Generator { + const { sqlArgs, sqlQuery } = buildQuery(args) + + for (const { key, value } of this.db.prepare(sqlQuery).iterate(sqlArgs)) { + yield { + key: decodeTuple(key) as Tuple, + value: JSON.parse(value), + } as KeyValuePair + } } commit = (writes: WriteOps) => { From d61097996691c7721a299097213b675714ca24b8 Mon Sep 17 00:00:00 2001 From: Adam Sherwood Date: Wed, 31 Jan 2024 09:40:00 +0000 Subject: [PATCH 2/2] updates types and adds tests --- src/database/async/AsyncTupleDatabase.ts | 9 + .../async/AsyncTupleDatabaseClient.ts | 14 + src/database/async/asyncDatabaseTestSuite.ts | 578 ++++++++++++++++++ src/database/async/asyncTypes.ts | 11 + src/database/async/subscribeQueryAsync.ts | 8 + src/database/sync/TupleDatabase.ts | 9 + src/database/sync/TupleDatabaseClient.ts | 14 + src/database/sync/databaseTestSuite.ts | 578 ++++++++++++++++++ src/database/sync/subscribeQuery.ts | 8 + src/database/sync/types.ts | 9 + src/helpers/DelayDb.ts | 6 + src/helpers/sortedList.ts | 22 +- src/helpers/sortedTupleValuePairs.ts | 10 + src/helpers/subspaceHelpers.ts | 2 +- src/storage/InMemoryTupleStorage.ts | 4 + 15 files changed, 1280 insertions(+), 2 deletions(-) diff --git a/src/database/async/AsyncTupleDatabase.ts b/src/database/async/AsyncTupleDatabase.ts index ad950c2..cffbdc9 100644 --- a/src/database/async/AsyncTupleDatabase.ts +++ b/src/database/async/AsyncTupleDatabase.ts @@ -23,6 +23,15 @@ export class AsyncTupleDatabase implements AsyncTupleDatabaseApi { return this.storage.scan({ ...bounds, reverse, limit }) } + iterate( + args: ScanStorageArgs = {}, + txId?: TxId + ): AsyncGenerator | Generator { + const { reverse, limit, ...bounds } = args + if (txId) this.log.read(txId, bounds) + return this.storage.iterate({ ...bounds, reverse, limit }) + } + async subscribe( args: ScanStorageArgs, callback: AsyncCallback diff --git a/src/database/async/AsyncTupleDatabaseClient.ts b/src/database/async/AsyncTupleDatabaseClient.ts index a4d8101..bbc943e 100644 --- a/src/database/async/AsyncTupleDatabaseClient.ts +++ b/src/database/async/AsyncTupleDatabaseClient.ts @@ -6,6 +6,7 @@ import { prependPrefixToTuple, prependPrefixToWriteOps, removePrefixFromTuple, + removePrefixFromTupleValuePair, removePrefixFromTupleValuePairs, removePrefixFromWriteOps, } from "../../helpers/subspaceHelpers" @@ -44,6 +45,19 @@ export class AsyncTupleDatabaseClient return result as FilterTupleValuePairByPrefix[] } + async *iterate>( + args: ScanArgs = {}, + txId?: TxId + ): AsyncGenerator> { + const storageScanArgs = normalizeSubspaceScanArgs(this.subspacePrefix, args) + for await (const pair of this.db.iterate(storageScanArgs, txId)) { + yield removePrefixFromTupleValuePair( + this.subspacePrefix, + pair + ) as FilterTupleValuePairByPrefix + } + } + async subscribe>( args: ScanArgs, callback: AsyncCallback> diff --git a/src/database/async/asyncDatabaseTestSuite.ts b/src/database/async/asyncDatabaseTestSuite.ts index 145a115..f39ecba 100644 --- a/src/database/async/asyncDatabaseTestSuite.ts +++ b/src/database/async/asyncDatabaseTestSuite.ts @@ -801,6 +801,584 @@ export function asyncDatabaseTestSuite( } }) + it("iterate gt", async () => { + const store = createStorage(randomId()) + + const items: KeyValuePair[] = [ + { key: ["a", "a", "a"], value: 1 }, + { key: ["a", "a", "b"], value: 2 }, + { key: ["a", "a", "c"], value: 3 }, + { key: ["a", "b", "a"], value: 4 }, + { key: ["a", "b", "b"], value: 5 }, + { key: ["a", "b", "c"], value: 6 }, + { key: ["a", "c", "a"], value: 7 }, + { key: ["a", "c", "b"], value: 8 }, + { key: ["a", "c", "c"], value: 9 }, + ] + const transaction = store.transact() + for (const { key, value } of _.shuffle(items)) { + transaction.set(key, value) + } + await transaction.commit() + const data = await store.scan() + assertEqual(data, items) + + const args = { + gt: ["a", "a", MAX], + } + const result: KeyValuePair[] = [] + for await (const pair of store.iterate(args)) { + result.push(pair) + } + + assertEqual(result, [ + { key: ["a", "b", "a"], value: 4 }, + { key: ["a", "b", "b"], value: 5 }, + { key: ["a", "b", "c"], value: 6 }, + { key: ["a", "c", "a"], value: 7 }, + { key: ["a", "c", "b"], value: 8 }, + { key: ["a", "c", "c"], value: 9 }, + ]) + }) + + it("iterate gt/lt", async () => { + const store = createStorage(randomId()) + + const items: KeyValuePair[] = [ + { key: ["a", "a", "a"], value: 1 }, + { key: ["a", "a", "b"], value: 2 }, + { key: ["a", "a", "c"], value: 3 }, + { key: ["a", "b", "a"], value: 4 }, + { key: ["a", "b", "b"], value: 5 }, + { key: ["a", "b", "c"], value: 6 }, + { key: ["a", "c", "a"], value: 7 }, + { key: ["a", "c", "b"], value: 8 }, + { key: ["a", "c", "c"], value: 9 }, + ] + const transaction = store.transact() + for (const { key, value } of _.shuffle(items)) { + transaction.set(key, value) + } + await transaction.commit() + const data = await store.scan() + assertEqual(data, items) + + const args = { + gt: ["a", "a", MAX], + lt: ["a", "c", MIN], + } + const result: KeyValuePair[] = [] + for await (const pair of store.iterate(args)) { + result.push(pair) + } + + assertEqual(result, [ + { key: ["a", "b", "a"], value: 4 }, + { key: ["a", "b", "b"], value: 5 }, + { key: ["a", "b", "c"], value: 6 }, + ]) + + const args2 = { + gt: ["a", "b", MIN], + lt: ["a", "b", MAX], + } + const result2: KeyValuePair[] = [] + for await (const pair of store.iterate(args2)) { + result2.push(pair) + } + + assertEqual(result2, [ + { key: ["a", "b", "a"], value: 4 }, + { key: ["a", "b", "b"], value: 5 }, + { key: ["a", "b", "c"], value: 6 }, + ]) + }) + + it("iterate prefix", async () => { + const store = createStorage(randomId()) + + const items: KeyValuePair[] = [ + { key: ["a", "a", "a"], value: 1 }, + { key: ["a", "a", "b"], value: 2 }, + { key: ["a", "a", "c"], value: 3 }, + { key: ["a", "b", "a"], value: 4 }, + { key: ["a", "b", "b"], value: 5 }, + { key: ["a", "b", "c"], value: 6 }, + { key: ["a", "c", "a"], value: 7 }, + { key: ["a", "c", "b"], value: 8 }, + { key: ["a", "c", "c"], value: 9 }, + ] + const transaction = store.transact() + for (const { key, value } of _.shuffle(items)) { + transaction.set(key, value) + } + await transaction.commit() + const data = await store.scan() + assertEqual(data, items) + + const args = { + prefix: ["a", "b"], + } + const result: KeyValuePair[] = [] + for await (const pair of store.iterate(args)) { + result.push(pair) + } + + assertEqual(result, [ + { key: ["a", "b", "a"], value: 4 }, + { key: ["a", "b", "b"], value: 5 }, + { key: ["a", "b", "c"], value: 6 }, + ]) + }) + + it("iterate prefix - issue with MAX being true", async () => { + const store = createStorage(randomId()) + + const items: KeyValuePair[] = [ + { key: [2, true], value: 1 }, + { key: [2, true, 1], value: 1 }, + { key: [2, true, true], value: 1 }, + { key: [2, true, true, 1], value: 1 }, + { key: [2, true, true, true], value: 1 }, + { key: [2, true, true, true, 1], value: 1 }, + ] + const transaction = store.transact() + for (const { key, value } of _.shuffle(items)) { + transaction.set(key, value) + } + await transaction.commit() + const data = await store.scan() + assertEqual(data, items) + + const args = { + prefix: [2], + } + const result: KeyValuePair[] = [] + for await (const pair of store.iterate(args)) { + result.push(pair) + } + + assertEqual(result, items) + }) + + it("iterate prefix gte/lte", async () => { + const store = createStorage(randomId()) + + const items: KeyValuePair[] = [ + { key: ["a", "a", "a"], value: 1 }, + { key: ["a", "a", "b"], value: 2 }, + { key: ["a", "a", "c"], value: 3 }, + { key: ["a", "b", "a"], value: 4 }, + { key: ["a", "b", "b"], value: 5 }, + { key: ["a", "b", "c"], value: 6 }, + { key: ["a", "b", "d"], value: 6.5 }, + { key: ["a", "c", "a"], value: 7 }, + { key: ["a", "c", "b"], value: 8 }, + { key: ["a", "c", "c"], value: 9 }, + ] + + const transaction = store.transact() + for (const { key, value } of _.shuffle(items)) { + transaction.set(key, value) + } + await transaction.commit() + const data = await store.scan() + assertEqual(data, items) + + const args = { + prefix: ["a", "b"], + gte: ["b"], + lte: ["d"], + } + const result: KeyValuePair[] = [] + for await (const pair of store.iterate(args)) { + result.push(pair) + } + + assertEqual(result, [ + { key: ["a", "b", "b"], value: 5 }, + { key: ["a", "b", "c"], value: 6 }, + { key: ["a", "b", "d"], value: 6.5 }, + ]) + }) + + it("iterate prefix gte/lte with schema types", async () => { + type Schema = + | { key: ["a", "a", "a"]; value: 1 } + | { key: ["a", "a", "b"]; value: 2 } + | { key: ["a", "a", "c"]; value: 3 } + | { key: ["a", "b", "a"]; value: 4 } + | { key: ["a", "b", "b"]; value: 5 } + | { key: ["a", "b", "c"]; value: 6 } + | { key: ["a", "b", "d"]; value: 6.5 } + | { key: ["a", "c", "a"]; value: 7 } + | { key: ["a", "c", "b"]; value: 8 } + | { key: ["a", "c", "c"]; value: 9 } + + const store = createStorage(randomId()) + + const items: Schema[] = [ + { key: ["a", "a", "a"], value: 1 }, + { key: ["a", "a", "b"], value: 2 }, + { key: ["a", "a", "c"], value: 3 }, + { key: ["a", "b", "a"], value: 4 }, + { key: ["a", "b", "b"], value: 5 }, + { key: ["a", "b", "c"], value: 6 }, + { key: ["a", "b", "d"], value: 6.5 }, + { key: ["a", "c", "a"], value: 7 }, + { key: ["a", "c", "b"], value: 8 }, + { key: ["a", "c", "c"], value: 9 }, + ] + + const transaction = store.transact() + for (const { key, value } of _.shuffle(items)) { + transaction.set(key, value) + } + await transaction.commit() + const data = await store.scan() + assertEqual(data, items) + + const result: KeyValuePair[] = [] + for await (const pair of store.iterate({ + prefix: ["a", "b"], + gte: ["b"], + lte: ["d"], + })) { + result.push(pair) + } + + assertEqual(result, [ + { key: ["a", "b", "b"], value: 5 }, + { key: ["a", "b", "c"], value: 6 }, + { key: ["a", "b", "d"], value: 6.5 }, + ]) + }) + + it("iterate gte", async () => { + const store = createStorage(randomId()) + + const items: KeyValuePair[] = [ + { key: ["a", "a", "a"], value: 1 }, + { key: ["a", "a", "b"], value: 2 }, + { key: ["a", "a", "c"], value: 3 }, + { key: ["a", "b", "a"], value: 4 }, + { key: ["a", "b", "b"], value: 5 }, + { key: ["a", "b", "c"], value: 6 }, + { key: ["a", "c", "a"], value: 7 }, + { key: ["a", "c", "b"], value: 8 }, + { key: ["a", "c", "c"], value: 9 }, + ] + const transaction = store.transact() + for (const { key, value } of _.shuffle(items)) { + transaction.set(key, value) + } + await transaction.commit() + const data = await store.scan() + assertEqual(data, items) + + const args = { + gte: ["a", "b", "a"], + } + const result: KeyValuePair[] = [] + for await (const pair of store.iterate(args)) { + result.push(pair) + } + + assertEqual(result, [ + { key: ["a", "b", "a"], value: 4 }, + { key: ["a", "b", "b"], value: 5 }, + { key: ["a", "b", "c"], value: 6 }, + { key: ["a", "c", "a"], value: 7 }, + { key: ["a", "c", "b"], value: 8 }, + { key: ["a", "c", "c"], value: 9 }, + ]) + }) + + it("iterate gte/lte", async () => { + const store = createStorage(randomId()) + + const items: KeyValuePair[] = [ + { key: ["a", "a", "a"], value: 1 }, + { key: ["a", "a", "b"], value: 2 }, + { key: ["a", "a", "c"], value: 3 }, + { key: ["a", "b", "a"], value: 4 }, + { key: ["a", "b", "b"], value: 5 }, + { key: ["a", "b", "c"], value: 6 }, + { key: ["a", "c", "a"], value: 7 }, + { key: ["a", "c", "b"], value: 8 }, + { key: ["a", "c", "c"], value: 9 }, + ] + const transaction = store.transact() + for (const { key, value } of _.shuffle(items)) { + transaction.set(key, value) + } + await transaction.commit() + const data = await store.scan() + assertEqual(data, items) + + const args = { + gte: ["a", "a", "c"], + lte: ["a", "c", MAX], + } + const result: KeyValuePair[] = [] + for await (const pair of store.iterate(args)) { + result.push(pair) + } + + assertEqual(result, [ + { key: ["a", "a", "c"], value: 3 }, + { key: ["a", "b", "a"], value: 4 }, + { key: ["a", "b", "b"], value: 5 }, + { key: ["a", "b", "c"], value: 6 }, + { key: ["a", "c", "a"], value: 7 }, + { key: ["a", "c", "b"], value: 8 }, + { key: ["a", "c", "c"], value: 9 }, + ]) + }) + + it("iterate gte/lte with schema types", async () => { + type Schema = + | { key: ["a", "a", "a"]; value: 1 } + | { key: ["a", "a", "b"]; value: 2 } + | { key: ["a", "a", "c"]; value: 3 } + | { key: ["a", "b", "a"]; value: 4 } + | { key: ["a", "b", "b"]; value: 5 } + | { key: ["a", "b", "c"]; value: 6 } + | { key: ["a", "c", "a"]; value: 7 } + | { key: ["a", "c", "b"]; value: 8 } + | { key: ["a", "c", "c"]; value: 9 } + + const store = createStorage(randomId()) + + const items: Schema[] = [ + { key: ["a", "a", "a"], value: 1 }, + { key: ["a", "a", "b"], value: 2 }, + { key: ["a", "a", "c"], value: 3 }, + { key: ["a", "b", "a"], value: 4 }, + { key: ["a", "b", "b"], value: 5 }, + { key: ["a", "b", "c"], value: 6 }, + { key: ["a", "c", "a"], value: 7 }, + { key: ["a", "c", "b"], value: 8 }, + { key: ["a", "c", "c"], value: 9 }, + ] + const transaction = store.transact() + for (const { key, value } of _.shuffle(items)) { + transaction.set(key, value) + } + await transaction.commit() + const data = await store.scan() + assertEqual(data, items) + + const result: KeyValuePair[] = [] + for await (const pair of store.iterate({ + gte: ["a", "a", "c"], + lte: ["a", "c", MAX], + })) { + result.push(pair) + } + + assertEqual(result, [ + { key: ["a", "a", "c"], value: 3 }, + { key: ["a", "b", "a"], value: 4 }, + { key: ["a", "b", "b"], value: 5 }, + { key: ["a", "b", "c"], value: 6 }, + { key: ["a", "c", "a"], value: 7 }, + { key: ["a", "c", "b"], value: 8 }, + { key: ["a", "c", "c"], value: 9 }, + ]) + }) + + it("iterate sorted gt", async () => { + const store = createStorage(randomId()) + + const items: KeyValuePair[] = [ + { key: ["a", "a", "a"], value: 1 }, + { key: ["a", "a", "b"], value: 2 }, + { key: ["a", "a", "c"], value: 3 }, + { key: ["a", "b", "a"], value: 4 }, + { key: ["a", "b", "b"], value: 5 }, + { key: ["a", "b", "c"], value: 6 }, + { key: ["a", "c", "a"], value: 7 }, + { key: ["a", "c", "b"], value: 8 }, + { key: ["a", "c", "c"], value: 9 }, + ] + const transaction = store.transact() + for (const { key, value } of _.shuffle(items)) { + transaction.set(key, value) + } + await transaction.commit() + const data = await store.scan() + assertEqual(data, items) + + const args = { + gt: ["a", "b", MAX], + } + const result: KeyValuePair[] = [] + for await (const pair of store.iterate(args)) { + result.push(pair) + } + + assertEqual(result, [ + { key: ["a", "c", "a"], value: 7 }, + { key: ["a", "c", "b"], value: 8 }, + { key: ["a", "c", "c"], value: 9 }, + ]) + }) + + it("iterate sorted gt/lt", async () => { + const store = createStorage(randomId()) + + const items: KeyValuePair[] = [ + { key: ["a", "a", "a"], value: 1 }, + { key: ["a", "a", "b"], value: 2 }, + { key: ["a", "a", "c"], value: 3 }, + { key: ["a", "b", "a"], value: 4 }, + { key: ["a", "b", "b"], value: 5 }, + { key: ["a", "b", "c"], value: 6 }, + { key: ["a", "c", "a"], value: 7 }, + { key: ["a", "c", "b"], value: 8 }, + { key: ["a", "c", "c"], value: 9 }, + ] + const transaction = store.transact() + for (const { key, value } of _.shuffle(items)) { + transaction.set(key, value) + } + await transaction.commit() + const data = await store.scan() + assertEqual(data, items) + + const args = { + gt: ["a", "a", MAX], + lt: ["a", "b", MAX], + } + const result: KeyValuePair[] = [] + for await (const pair of store.iterate(args)) { + result.push(pair) + } + + assertEqual(result, [ + { key: ["a", "b", "a"], value: 4 }, + { key: ["a", "b", "b"], value: 5 }, + { key: ["a", "b", "c"], value: 6 }, + ]) + }) + + it("iterate sorted gte", async () => { + const store = createStorage(randomId()) + + const items: KeyValuePair[] = [ + { key: ["a", "a", "a"], value: 1 }, + { key: ["a", "a", "b"], value: 2 }, + { key: ["a", "a", "c"], value: 3 }, + { key: ["a", "b", "a"], value: 4 }, + { key: ["a", "b", "b"], value: 5 }, + { key: ["a", "b", "c"], value: 6 }, + { key: ["a", "c", "a"], value: 7 }, + { key: ["a", "c", "b"], value: 8 }, + { key: ["a", "c", "c"], value: 9 }, + ] + const transaction = store.transact() + for (const { key, value } of _.shuffle(items)) { + transaction.set(key, value) + } + await transaction.commit() + const data = await store.scan() + assertEqual(data, items) + + const args = { + gte: ["a", "b", MIN], + } + const result: KeyValuePair[] = [] + for await (const pair of store.iterate(args)) { + result.push(pair) + } + + assertEqual(result, [ + { key: ["a", "b", "a"], value: 4 }, + { key: ["a", "b", "b"], value: 5 }, + { key: ["a", "b", "c"], value: 6 }, + { key: ["a", "c", "a"], value: 7 }, + { key: ["a", "c", "b"], value: 8 }, + { key: ["a", "c", "c"], value: 9 }, + ]) + }) + + it("iterate sorted gte/lte", async () => { + const store = createStorage(randomId()) + + const items: KeyValuePair[] = [ + { key: ["a", "a", "a"], value: 1 }, + { key: ["a", "a", "b"], value: 2 }, + { key: ["a", "a", "c"], value: 3 }, + { key: ["a", "b", "a"], value: 4 }, + { key: ["a", "b", "b"], value: 5 }, + { key: ["a", "b", "c"], value: 6 }, + { key: ["a", "c", "a"], value: 7 }, + { key: ["a", "c", "b"], value: 8 }, + { key: ["a", "c", "c"], value: 9 }, + ] + const transaction = store.transact() + for (const { key, value } of _.shuffle(items)) { + transaction.set(key, value) + } + await transaction.commit() + const data = await store.scan() + assertEqual(data, items) + + const args = { + gte: ["a", "a", "c"], + lte: ["a", "b", MAX], + } + const result: KeyValuePair[] = [] + for await (const pair of store.iterate(args)) { + result.push(pair) + } + + assertEqual(result, [ + { key: ["a", "a", "c"], value: 3 }, + { key: ["a", "b", "a"], value: 4 }, + { key: ["a", "b", "b"], value: 5 }, + { key: ["a", "b", "c"], value: 6 }, + ]) + }) + + it("iterate invalid bounds", async () => { + const store = createStorage(randomId()) + + const items: KeyValuePair[] = [ + { key: ["a", "a", "a"], value: 1 }, + { key: ["a", "a", "b"], value: 2 }, + { key: ["a", "a", "c"], value: 3 }, + { key: ["a", "b", "a"], value: 4 }, + { key: ["a", "b", "b"], value: 5 }, + { key: ["a", "b", "c"], value: 6 }, + { key: ["a", "c", "a"], value: 7 }, + { key: ["a", "c", "b"], value: 8 }, + { key: ["a", "c", "c"], value: 9 }, + ] + const transaction = store.transact() + for (const { key, value } of _.shuffle(items)) { + transaction.set(key, value) + } + await transaction.commit() + const data = await store.scan() + assertEqual(data, items) + + try { + const args = { + gte: ["a", "c"], + lte: ["a", "a"], + } + const result: KeyValuePair[] = [] + for await (const pair of store.iterate(args)) { + result.push(pair) + } + assert.fail("Should fail.") + } catch (error) { + assert.ok(error) + } + }) + it("stores all types of values", async () => { const store = createStorage(randomId()) const items: KeyValuePair[] = sortedValues.map( diff --git a/src/database/async/asyncTypes.ts b/src/database/async/asyncTypes.ts index dc76982..2a97bf2 100644 --- a/src/database/async/asyncTypes.ts +++ b/src/database/async/asyncTypes.ts @@ -10,6 +10,7 @@ import { ScanArgs, TxId, Unsubscribe } from "../types" /** The low-level API for implementing new storage layers. */ export type AsyncTupleStorageApi = { scan: (args?: ScanStorageArgs) => Promise + iterate: (args?: ScanStorageArgs) => AsyncGenerator commit: (writes: WriteOps) => Promise close: () => Promise } @@ -17,6 +18,10 @@ export type AsyncTupleStorageApi = { /** Wraps AsyncTupleStorageApi with reactivity and MVCC */ export type AsyncTupleDatabaseApi = { scan: (args?: ScanStorageArgs, txId?: TxId) => Promise + iterate: ( + args?: ScanStorageArgs, + txId?: TxId + ) => AsyncGenerator | Generator commit: (writes: WriteOps, txId?: TxId) => Promise cancel: (txId: string) => Promise subscribe: ( @@ -36,6 +41,12 @@ export type AsyncTupleDatabaseClientApi = args?: ScanArgs, txId?: TxId ) => Promise[]> + iterate: >( + args?: ScanArgs, + txId?: TxId + ) => + | AsyncGenerator> + | Generator> subscribe: >( args: ScanArgs, callback: AsyncCallback> diff --git a/src/database/async/subscribeQueryAsync.ts b/src/database/async/subscribeQueryAsync.ts index 5ee33d6..79e901a 100644 --- a/src/database/async/subscribeQueryAsync.ts +++ b/src/database/async/subscribeQueryAsync.ts @@ -56,6 +56,14 @@ export async function subscribeQueryAsync( const results = await db.scan(args) return results }, + iterate: (args: any, txId) => { + const destroy = db.subscribe(args, async (_writes, txId) => + recomputeQueue.enqueue(() => recompute(txId)) + ) + listeners.add(destroy) + + return db.iterate(args) + }, cancel: async (txId) => { await db.cancel(txId) }, diff --git a/src/database/sync/TupleDatabase.ts b/src/database/sync/TupleDatabase.ts index 5d7a156..bb06d78 100644 --- a/src/database/sync/TupleDatabase.ts +++ b/src/database/sync/TupleDatabase.ts @@ -27,6 +27,15 @@ export class TupleDatabase implements TupleDatabaseApi { return this.storage.scan({ ...bounds, reverse, limit }) } + iterate( + args: ScanStorageArgs = {}, + txId?: TxId + ): Identity> { + const { reverse, limit, ...bounds } = args + if (txId) this.log.read(txId, bounds) + return this.storage.iterate({ ...bounds, reverse, limit }) + } + subscribe(args: ScanStorageArgs, callback: Callback): Identity { return this.reactivity.subscribe(args, callback) } diff --git a/src/database/sync/TupleDatabaseClient.ts b/src/database/sync/TupleDatabaseClient.ts index f71c5a0..0d89538 100644 --- a/src/database/sync/TupleDatabaseClient.ts +++ b/src/database/sync/TupleDatabaseClient.ts @@ -14,6 +14,7 @@ import { prependPrefixToTuple, prependPrefixToWriteOps, removePrefixFromTuple, + removePrefixFromTupleValuePair, removePrefixFromTupleValuePairs, removePrefixFromWriteOps, } from "../../helpers/subspaceHelpers" @@ -51,6 +52,19 @@ export class TupleDatabaseClient return result as FilterTupleValuePairByPrefix[] } + *iterate>( + args: ScanArgs = {}, + txId?: TxId + ): Identity>> { + const storageScanArgs = normalizeSubspaceScanArgs(this.subspacePrefix, args) + for (const pair of this.db.iterate(storageScanArgs, txId)) { + yield removePrefixFromTupleValuePair( + this.subspacePrefix, + pair + ) as FilterTupleValuePairByPrefix + } + } + subscribe>( args: ScanArgs, callback: Callback> diff --git a/src/database/sync/databaseTestSuite.ts b/src/database/sync/databaseTestSuite.ts index 3738610..bd6ed0e 100644 --- a/src/database/sync/databaseTestSuite.ts +++ b/src/database/sync/databaseTestSuite.ts @@ -806,6 +806,584 @@ export function databaseTestSuite( } }) + it("iterate gt", () => { + const store = createStorage(randomId()) + + const items: KeyValuePair[] = [ + { key: ["a", "a", "a"], value: 1 }, + { key: ["a", "a", "b"], value: 2 }, + { key: ["a", "a", "c"], value: 3 }, + { key: ["a", "b", "a"], value: 4 }, + { key: ["a", "b", "b"], value: 5 }, + { key: ["a", "b", "c"], value: 6 }, + { key: ["a", "c", "a"], value: 7 }, + { key: ["a", "c", "b"], value: 8 }, + { key: ["a", "c", "c"], value: 9 }, + ] + const transaction = store.transact() + for (const { key, value } of _.shuffle(items)) { + transaction.set(key, value) + } + transaction.commit() + const data = store.scan() + assertEqual(data, items) + + const args = { + gt: ["a", "a", MAX], + } + const result: KeyValuePair[] = [] + for (const pair of store.iterate(args)) { + result.push(pair) + } + + assertEqual(result, [ + { key: ["a", "b", "a"], value: 4 }, + { key: ["a", "b", "b"], value: 5 }, + { key: ["a", "b", "c"], value: 6 }, + { key: ["a", "c", "a"], value: 7 }, + { key: ["a", "c", "b"], value: 8 }, + { key: ["a", "c", "c"], value: 9 }, + ]) + }) + + it("iterate gt/lt", () => { + const store = createStorage(randomId()) + + const items: KeyValuePair[] = [ + { key: ["a", "a", "a"], value: 1 }, + { key: ["a", "a", "b"], value: 2 }, + { key: ["a", "a", "c"], value: 3 }, + { key: ["a", "b", "a"], value: 4 }, + { key: ["a", "b", "b"], value: 5 }, + { key: ["a", "b", "c"], value: 6 }, + { key: ["a", "c", "a"], value: 7 }, + { key: ["a", "c", "b"], value: 8 }, + { key: ["a", "c", "c"], value: 9 }, + ] + const transaction = store.transact() + for (const { key, value } of _.shuffle(items)) { + transaction.set(key, value) + } + transaction.commit() + const data = store.scan() + assertEqual(data, items) + + const args = { + gt: ["a", "a", MAX], + lt: ["a", "c", MIN], + } + const result: KeyValuePair[] = [] + for (const pair of store.iterate(args)) { + result.push(pair) + } + + assertEqual(result, [ + { key: ["a", "b", "a"], value: 4 }, + { key: ["a", "b", "b"], value: 5 }, + { key: ["a", "b", "c"], value: 6 }, + ]) + + const args2 = { + gt: ["a", "b", MIN], + lt: ["a", "b", MAX], + } + const result2: KeyValuePair[] = [] + for (const pair of store.iterate(args2)) { + result2.push(pair) + } + + assertEqual(result2, [ + { key: ["a", "b", "a"], value: 4 }, + { key: ["a", "b", "b"], value: 5 }, + { key: ["a", "b", "c"], value: 6 }, + ]) + }) + + it("iterate prefix", () => { + const store = createStorage(randomId()) + + const items: KeyValuePair[] = [ + { key: ["a", "a", "a"], value: 1 }, + { key: ["a", "a", "b"], value: 2 }, + { key: ["a", "a", "c"], value: 3 }, + { key: ["a", "b", "a"], value: 4 }, + { key: ["a", "b", "b"], value: 5 }, + { key: ["a", "b", "c"], value: 6 }, + { key: ["a", "c", "a"], value: 7 }, + { key: ["a", "c", "b"], value: 8 }, + { key: ["a", "c", "c"], value: 9 }, + ] + const transaction = store.transact() + for (const { key, value } of _.shuffle(items)) { + transaction.set(key, value) + } + transaction.commit() + const data = store.scan() + assertEqual(data, items) + + const args = { + prefix: ["a", "b"], + } + const result: KeyValuePair[] = [] + for (const pair of store.iterate(args)) { + result.push(pair) + } + + assertEqual(result, [ + { key: ["a", "b", "a"], value: 4 }, + { key: ["a", "b", "b"], value: 5 }, + { key: ["a", "b", "c"], value: 6 }, + ]) + }) + + it("iterate prefix - issue with MAX being true", () => { + const store = createStorage(randomId()) + + const items: KeyValuePair[] = [ + { key: [2, true], value: 1 }, + { key: [2, true, 1], value: 1 }, + { key: [2, true, true], value: 1 }, + { key: [2, true, true, 1], value: 1 }, + { key: [2, true, true, true], value: 1 }, + { key: [2, true, true, true, 1], value: 1 }, + ] + const transaction = store.transact() + for (const { key, value } of _.shuffle(items)) { + transaction.set(key, value) + } + transaction.commit() + const data = store.scan() + assertEqual(data, items) + + const args = { + prefix: [2], + } + const result: KeyValuePair[] = [] + for (const pair of store.iterate(args)) { + result.push(pair) + } + + assertEqual(result, items) + }) + + it("iterate prefix gte/lte", () => { + const store = createStorage(randomId()) + + const items: KeyValuePair[] = [ + { key: ["a", "a", "a"], value: 1 }, + { key: ["a", "a", "b"], value: 2 }, + { key: ["a", "a", "c"], value: 3 }, + { key: ["a", "b", "a"], value: 4 }, + { key: ["a", "b", "b"], value: 5 }, + { key: ["a", "b", "c"], value: 6 }, + { key: ["a", "b", "d"], value: 6.5 }, + { key: ["a", "c", "a"], value: 7 }, + { key: ["a", "c", "b"], value: 8 }, + { key: ["a", "c", "c"], value: 9 }, + ] + + const transaction = store.transact() + for (const { key, value } of _.shuffle(items)) { + transaction.set(key, value) + } + transaction.commit() + const data = store.scan() + assertEqual(data, items) + + const args = { + prefix: ["a", "b"], + gte: ["b"], + lte: ["d"], + } + const result: KeyValuePair[] = [] + for (const pair of store.iterate(args)) { + result.push(pair) + } + + assertEqual(result, [ + { key: ["a", "b", "b"], value: 5 }, + { key: ["a", "b", "c"], value: 6 }, + { key: ["a", "b", "d"], value: 6.5 }, + ]) + }) + + it("iterate prefix gte/lte with schema types", () => { + type Schema = + | { key: ["a", "a", "a"]; value: 1 } + | { key: ["a", "a", "b"]; value: 2 } + | { key: ["a", "a", "c"]; value: 3 } + | { key: ["a", "b", "a"]; value: 4 } + | { key: ["a", "b", "b"]; value: 5 } + | { key: ["a", "b", "c"]; value: 6 } + | { key: ["a", "b", "d"]; value: 6.5 } + | { key: ["a", "c", "a"]; value: 7 } + | { key: ["a", "c", "b"]; value: 8 } + | { key: ["a", "c", "c"]; value: 9 } + + const store = createStorage(randomId()) + + const items: Schema[] = [ + { key: ["a", "a", "a"], value: 1 }, + { key: ["a", "a", "b"], value: 2 }, + { key: ["a", "a", "c"], value: 3 }, + { key: ["a", "b", "a"], value: 4 }, + { key: ["a", "b", "b"], value: 5 }, + { key: ["a", "b", "c"], value: 6 }, + { key: ["a", "b", "d"], value: 6.5 }, + { key: ["a", "c", "a"], value: 7 }, + { key: ["a", "c", "b"], value: 8 }, + { key: ["a", "c", "c"], value: 9 }, + ] + + const transaction = store.transact() + for (const { key, value } of _.shuffle(items)) { + transaction.set(key, value) + } + transaction.commit() + const data = store.scan() + assertEqual(data, items) + + const result: KeyValuePair[] = [] + for (const pair of store.iterate({ + prefix: ["a", "b"], + gte: ["b"], + lte: ["d"], + })) { + result.push(pair) + } + + assertEqual(result, [ + { key: ["a", "b", "b"], value: 5 }, + { key: ["a", "b", "c"], value: 6 }, + { key: ["a", "b", "d"], value: 6.5 }, + ]) + }) + + it("iterate gte", () => { + const store = createStorage(randomId()) + + const items: KeyValuePair[] = [ + { key: ["a", "a", "a"], value: 1 }, + { key: ["a", "a", "b"], value: 2 }, + { key: ["a", "a", "c"], value: 3 }, + { key: ["a", "b", "a"], value: 4 }, + { key: ["a", "b", "b"], value: 5 }, + { key: ["a", "b", "c"], value: 6 }, + { key: ["a", "c", "a"], value: 7 }, + { key: ["a", "c", "b"], value: 8 }, + { key: ["a", "c", "c"], value: 9 }, + ] + const transaction = store.transact() + for (const { key, value } of _.shuffle(items)) { + transaction.set(key, value) + } + transaction.commit() + const data = store.scan() + assertEqual(data, items) + + const args = { + gte: ["a", "b", "a"], + } + const result: KeyValuePair[] = [] + for (const pair of store.iterate(args)) { + result.push(pair) + } + + assertEqual(result, [ + { key: ["a", "b", "a"], value: 4 }, + { key: ["a", "b", "b"], value: 5 }, + { key: ["a", "b", "c"], value: 6 }, + { key: ["a", "c", "a"], value: 7 }, + { key: ["a", "c", "b"], value: 8 }, + { key: ["a", "c", "c"], value: 9 }, + ]) + }) + + it("iterate gte/lte", () => { + const store = createStorage(randomId()) + + const items: KeyValuePair[] = [ + { key: ["a", "a", "a"], value: 1 }, + { key: ["a", "a", "b"], value: 2 }, + { key: ["a", "a", "c"], value: 3 }, + { key: ["a", "b", "a"], value: 4 }, + { key: ["a", "b", "b"], value: 5 }, + { key: ["a", "b", "c"], value: 6 }, + { key: ["a", "c", "a"], value: 7 }, + { key: ["a", "c", "b"], value: 8 }, + { key: ["a", "c", "c"], value: 9 }, + ] + const transaction = store.transact() + for (const { key, value } of _.shuffle(items)) { + transaction.set(key, value) + } + transaction.commit() + const data = store.scan() + assertEqual(data, items) + + const args = { + gte: ["a", "a", "c"], + lte: ["a", "c", MAX], + } + const result: KeyValuePair[] = [] + for (const pair of store.iterate(args)) { + result.push(pair) + } + + assertEqual(result, [ + { key: ["a", "a", "c"], value: 3 }, + { key: ["a", "b", "a"], value: 4 }, + { key: ["a", "b", "b"], value: 5 }, + { key: ["a", "b", "c"], value: 6 }, + { key: ["a", "c", "a"], value: 7 }, + { key: ["a", "c", "b"], value: 8 }, + { key: ["a", "c", "c"], value: 9 }, + ]) + }) + + it("iterate gte/lte with schema types", () => { + type Schema = + | { key: ["a", "a", "a"]; value: 1 } + | { key: ["a", "a", "b"]; value: 2 } + | { key: ["a", "a", "c"]; value: 3 } + | { key: ["a", "b", "a"]; value: 4 } + | { key: ["a", "b", "b"]; value: 5 } + | { key: ["a", "b", "c"]; value: 6 } + | { key: ["a", "c", "a"]; value: 7 } + | { key: ["a", "c", "b"]; value: 8 } + | { key: ["a", "c", "c"]; value: 9 } + + const store = createStorage(randomId()) + + const items: Schema[] = [ + { key: ["a", "a", "a"], value: 1 }, + { key: ["a", "a", "b"], value: 2 }, + { key: ["a", "a", "c"], value: 3 }, + { key: ["a", "b", "a"], value: 4 }, + { key: ["a", "b", "b"], value: 5 }, + { key: ["a", "b", "c"], value: 6 }, + { key: ["a", "c", "a"], value: 7 }, + { key: ["a", "c", "b"], value: 8 }, + { key: ["a", "c", "c"], value: 9 }, + ] + const transaction = store.transact() + for (const { key, value } of _.shuffle(items)) { + transaction.set(key, value) + } + transaction.commit() + const data = store.scan() + assertEqual(data, items) + + const result: KeyValuePair[] = [] + for (const pair of store.iterate({ + gte: ["a", "a", "c"], + lte: ["a", "c", MAX], + })) { + result.push(pair) + } + + assertEqual(result, [ + { key: ["a", "a", "c"], value: 3 }, + { key: ["a", "b", "a"], value: 4 }, + { key: ["a", "b", "b"], value: 5 }, + { key: ["a", "b", "c"], value: 6 }, + { key: ["a", "c", "a"], value: 7 }, + { key: ["a", "c", "b"], value: 8 }, + { key: ["a", "c", "c"], value: 9 }, + ]) + }) + + it("iterate sorted gt", () => { + const store = createStorage(randomId()) + + const items: KeyValuePair[] = [ + { key: ["a", "a", "a"], value: 1 }, + { key: ["a", "a", "b"], value: 2 }, + { key: ["a", "a", "c"], value: 3 }, + { key: ["a", "b", "a"], value: 4 }, + { key: ["a", "b", "b"], value: 5 }, + { key: ["a", "b", "c"], value: 6 }, + { key: ["a", "c", "a"], value: 7 }, + { key: ["a", "c", "b"], value: 8 }, + { key: ["a", "c", "c"], value: 9 }, + ] + const transaction = store.transact() + for (const { key, value } of _.shuffle(items)) { + transaction.set(key, value) + } + transaction.commit() + const data = store.scan() + assertEqual(data, items) + + const args = { + gt: ["a", "b", MAX], + } + const result: KeyValuePair[] = [] + for (const pair of store.iterate(args)) { + result.push(pair) + } + + assertEqual(result, [ + { key: ["a", "c", "a"], value: 7 }, + { key: ["a", "c", "b"], value: 8 }, + { key: ["a", "c", "c"], value: 9 }, + ]) + }) + + it("iterate sorted gt/lt", () => { + const store = createStorage(randomId()) + + const items: KeyValuePair[] = [ + { key: ["a", "a", "a"], value: 1 }, + { key: ["a", "a", "b"], value: 2 }, + { key: ["a", "a", "c"], value: 3 }, + { key: ["a", "b", "a"], value: 4 }, + { key: ["a", "b", "b"], value: 5 }, + { key: ["a", "b", "c"], value: 6 }, + { key: ["a", "c", "a"], value: 7 }, + { key: ["a", "c", "b"], value: 8 }, + { key: ["a", "c", "c"], value: 9 }, + ] + const transaction = store.transact() + for (const { key, value } of _.shuffle(items)) { + transaction.set(key, value) + } + transaction.commit() + const data = store.scan() + assertEqual(data, items) + + const args = { + gt: ["a", "a", MAX], + lt: ["a", "b", MAX], + } + const result: KeyValuePair[] = [] + for (const pair of store.iterate(args)) { + result.push(pair) + } + + assertEqual(result, [ + { key: ["a", "b", "a"], value: 4 }, + { key: ["a", "b", "b"], value: 5 }, + { key: ["a", "b", "c"], value: 6 }, + ]) + }) + + it("iterate sorted gte", () => { + const store = createStorage(randomId()) + + const items: KeyValuePair[] = [ + { key: ["a", "a", "a"], value: 1 }, + { key: ["a", "a", "b"], value: 2 }, + { key: ["a", "a", "c"], value: 3 }, + { key: ["a", "b", "a"], value: 4 }, + { key: ["a", "b", "b"], value: 5 }, + { key: ["a", "b", "c"], value: 6 }, + { key: ["a", "c", "a"], value: 7 }, + { key: ["a", "c", "b"], value: 8 }, + { key: ["a", "c", "c"], value: 9 }, + ] + const transaction = store.transact() + for (const { key, value } of _.shuffle(items)) { + transaction.set(key, value) + } + transaction.commit() + const data = store.scan() + assertEqual(data, items) + + const args = { + gte: ["a", "b", MIN], + } + const result: KeyValuePair[] = [] + for (const pair of store.iterate(args)) { + result.push(pair) + } + + assertEqual(result, [ + { key: ["a", "b", "a"], value: 4 }, + { key: ["a", "b", "b"], value: 5 }, + { key: ["a", "b", "c"], value: 6 }, + { key: ["a", "c", "a"], value: 7 }, + { key: ["a", "c", "b"], value: 8 }, + { key: ["a", "c", "c"], value: 9 }, + ]) + }) + + it("iterate sorted gte/lte", () => { + const store = createStorage(randomId()) + + const items: KeyValuePair[] = [ + { key: ["a", "a", "a"], value: 1 }, + { key: ["a", "a", "b"], value: 2 }, + { key: ["a", "a", "c"], value: 3 }, + { key: ["a", "b", "a"], value: 4 }, + { key: ["a", "b", "b"], value: 5 }, + { key: ["a", "b", "c"], value: 6 }, + { key: ["a", "c", "a"], value: 7 }, + { key: ["a", "c", "b"], value: 8 }, + { key: ["a", "c", "c"], value: 9 }, + ] + const transaction = store.transact() + for (const { key, value } of _.shuffle(items)) { + transaction.set(key, value) + } + transaction.commit() + const data = store.scan() + assertEqual(data, items) + + const args = { + gte: ["a", "a", "c"], + lte: ["a", "b", MAX], + } + const result: KeyValuePair[] = [] + for (const pair of store.iterate(args)) { + result.push(pair) + } + + assertEqual(result, [ + { key: ["a", "a", "c"], value: 3 }, + { key: ["a", "b", "a"], value: 4 }, + { key: ["a", "b", "b"], value: 5 }, + { key: ["a", "b", "c"], value: 6 }, + ]) + }) + + it("iterate invalid bounds", () => { + const store = createStorage(randomId()) + + const items: KeyValuePair[] = [ + { key: ["a", "a", "a"], value: 1 }, + { key: ["a", "a", "b"], value: 2 }, + { key: ["a", "a", "c"], value: 3 }, + { key: ["a", "b", "a"], value: 4 }, + { key: ["a", "b", "b"], value: 5 }, + { key: ["a", "b", "c"], value: 6 }, + { key: ["a", "c", "a"], value: 7 }, + { key: ["a", "c", "b"], value: 8 }, + { key: ["a", "c", "c"], value: 9 }, + ] + const transaction = store.transact() + for (const { key, value } of _.shuffle(items)) { + transaction.set(key, value) + } + transaction.commit() + const data = store.scan() + assertEqual(data, items) + + try { + const args = { + gte: ["a", "c"], + lte: ["a", "a"], + } + const result: KeyValuePair[] = [] + for (const pair of store.iterate(args)) { + result.push(pair) + } + assert.fail("Should fail.") + } catch (error) { + assert.ok(error) + } + }) + it("stores all types of values", () => { const store = createStorage(randomId()) const items: KeyValuePair[] = sortedValues.map( diff --git a/src/database/sync/subscribeQuery.ts b/src/database/sync/subscribeQuery.ts index e9799a2..26cbd57 100644 --- a/src/database/sync/subscribeQuery.ts +++ b/src/database/sync/subscribeQuery.ts @@ -64,6 +64,14 @@ export function subscribeQuery( const results = db.scan(args) return results }, + iterate: (args: any, txId) => { + const destroy = db.subscribe(args, (_writes, txId) => + recomputeQueue.enqueue(() => recompute(txId)) + ) + listeners.add(destroy) + + return db.iterate(args) + }, cancel: (txId) => { db.cancel(txId) }, diff --git a/src/database/sync/types.ts b/src/database/sync/types.ts index ff807ac..4e325a3 100644 --- a/src/database/sync/types.ts +++ b/src/database/sync/types.ts @@ -18,6 +18,7 @@ import { ScanArgs, TxId, Unsubscribe } from "../types" /** The low-level API for implementing new storage layers. */ export type TupleStorageApi = { scan: (args?: ScanStorageArgs) => Identity + iterate: (args?: ScanStorageArgs) => Identity> commit: (writes: WriteOps) => Identity close: () => Identity } @@ -25,6 +26,10 @@ export type TupleStorageApi = { /** Wraps TupleStorageApi with reactivity and MVCC */ export type TupleDatabaseApi = { scan: (args?: ScanStorageArgs, txId?: TxId) => Identity + iterate: ( + args?: ScanStorageArgs, + txId?: TxId + ) => Identity> commit: (writes: WriteOps, txId?: TxId) => Identity cancel: (txId: string) => Identity subscribe: ( @@ -43,6 +48,10 @@ export type TupleDatabaseClientApi = { args?: ScanArgs, txId?: TxId ) => Identity[]> + iterate: >( + args?: ScanArgs, + txId?: TxId + ) => Identity>> subscribe: >( args: ScanArgs, callback: Callback> diff --git a/src/helpers/DelayDb.ts b/src/helpers/DelayDb.ts index 981be4c..4f27416 100644 --- a/src/helpers/DelayDb.ts +++ b/src/helpers/DelayDb.ts @@ -15,6 +15,12 @@ export function DelayDb( await sleep(delay) return db.scan(...args) }, + iterate: async function* (...args) { + for await (const res of db.iterate(...args)) { + await sleep(delay) + yield res + } + }, commit: async (...args) => { await sleep(delay) return db.commit(...args) diff --git a/src/helpers/sortedList.ts b/src/helpers/sortedList.ts index f921408..113ec39 100644 --- a/src/helpers/sortedList.ts +++ b/src/helpers/sortedList.ts @@ -40,7 +40,7 @@ type ScanArgs = { reverse?: boolean } -export function scan(list: T[], args: ScanArgs, cmp: Compare) { +function getScanBounds(list: T[], args: ScanArgs, cmp: Compare) { const start = args.gte || args.gt const end = args.lte || args.lt @@ -84,7 +84,27 @@ export function scan(list: T[], args: ScanArgs, cmp: Compare) { ? Math.min(lowerSearchBound + args.limit, upperSearchBound) : upperSearchBound + return { lowerDataBound, upperDataBound } +} + +export function scan(list: T[], args: ScanArgs, cmp: Compare) { + const { lowerDataBound, upperDataBound } = getScanBounds(list, args, cmp) + return args.reverse ? list.slice(lowerDataBound, upperDataBound).reverse() : list.slice(lowerDataBound, upperDataBound) } + +export function* iterate(list: T[], args: ScanArgs, cmp: Compare) { + const { lowerDataBound, upperDataBound } = getScanBounds(list, args, cmp) + + if (args.reverse) { + for (let i = upperDataBound - 1; i >= lowerDataBound; i--) { + yield list[i] + } + } else { + for (let i = lowerDataBound; i < upperDataBound; i++) { + yield list[i] + } + } +} diff --git a/src/helpers/sortedTupleValuePairs.ts b/src/helpers/sortedTupleValuePairs.ts index b04c302..dee0cc6 100644 --- a/src/helpers/sortedTupleValuePairs.ts +++ b/src/helpers/sortedTupleValuePairs.ts @@ -70,3 +70,13 @@ export function scan(data: KeyValuePair[], args: ScanArgs = {}) { compareTupleValuePair ) } + +export function iterate(data: KeyValuePair[], args: ScanArgs = {}) { + const { limit, reverse, ...rest } = args + const bounds = normalizeTupleValuePairBounds(rest) + return sortedList.iterate( + data, + { limit, reverse, ...bounds }, + compareTupleValuePair + ) +} diff --git a/src/helpers/subspaceHelpers.ts b/src/helpers/subspaceHelpers.ts index a2b99cc..23b7a78 100644 --- a/src/helpers/subspaceHelpers.ts +++ b/src/helpers/subspaceHelpers.ts @@ -75,7 +75,7 @@ function removePrefixFromTuples(prefix: Tuple, tuples: Tuple[]) { return tuples.map((tuple) => removePrefixFromTuple(prefix, tuple)) } -function removePrefixFromTupleValuePair( +export function removePrefixFromTupleValuePair( prefix: Tuple, pair: KeyValuePair ): KeyValuePair { diff --git a/src/storage/InMemoryTupleStorage.ts b/src/storage/InMemoryTupleStorage.ts index 0520085..e2e4cf2 100644 --- a/src/storage/InMemoryTupleStorage.ts +++ b/src/storage/InMemoryTupleStorage.ts @@ -13,6 +13,10 @@ export class InMemoryTupleStorage implements TupleStorageApi { return tv.scan(this.data, args) } + iterate(args?: ScanStorageArgs) { + return tv.iterate(this.data, args) + } + commit(writes: WriteOps) { // Indexers run inside the tx so we don't need to do that here. // And because of that, the order here should not matter.