Skip to content

Commit

Permalink
feat(RevertController): support revert in insert/update/delete
Browse files Browse the repository at this point in the history
  • Loading branch information
Brooooooklyn committed Aug 17, 2017
1 parent d84a22e commit e853646
Show file tree
Hide file tree
Showing 11 changed files with 418 additions and 96 deletions.
2 changes: 2 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
"license": "MIT",
"devDependencies": {
"@types/chai": "^4.0.3",
"@types/lodash": "^4.14.73",
"@types/node": "^8.0.23",
"@types/shelljs": "^0.7.4",
"@types/sinon": "^2.3.3",
Expand All @@ -71,6 +72,7 @@
"extract-text-webpack-plugin": "^3.0.0",
"happypack": "^4.0.0-beta.2",
"html-webpack-plugin": "^2.30.1",
"lodash": "^4.17.4",
"madge": "^2.0.0",
"moment": "^2.18.1",
"node-watch": "^0.5.5",
Expand Down
1 change: 1 addition & 0 deletions src/exception/index.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
export * from './database'
export * from './token'
export * from './revert'
10 changes: 10 additions & 0 deletions src/exception/revert.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import { ReactiveDBException } from './Exception'

export const tokenMustBeSymbol = (token: any) =>
new ReactiveDBException(`Symbol type expected, but got ${ typeof token }: ${ token }`)

export const clauseMissingError = () =>
new ReactiveDBException('Clause must be specified when when reverControoler is passed to delete method')

export const revertBeforeOperationSuccessError = () =>
new ReactiveDBException('You can only revert after the operation that you passed RevertController success')
257 changes: 167 additions & 90 deletions src/storage/Database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ import * as Exception from '../exception'
import * as typeDefinition from './helper/definition'
import Version from '../version'
import { Traversable } from '../shared'
import { Mutation, Selector, QueryToken, PredicateProvider, checkPredicate, predicateOperatorNames } from './modules'
import { Mutation, Selector, QueryToken, PredicateProvider, checkPredicate, predicateOperatorNames, RevertController } from './modules'
import { dispose, contextTableName, fieldIdentifier, hiddenColName } from './symbols'
import { forEach, clone, contains, tryCatch, hasOwn, getType, assert, identity, warn, keys as objKeys } from '../utils'
import { createPredicate, createPkClause, mergeTransactionResult, predicatableQuery, lfFactory } from './helper'
import { createPredicate, createPkClause, predicatableQuery, lfFactory, executor } from './helper'
import { Relationship, RDBType, DataStoreType, LeafType, StatementType, JoinMode } from '../interface/enum'
import { Record, Fields, JoinInfo, Query, Predicate } from '../interface'
import { SchemaDef, ColumnDef, ParsedSchema, Association, ScopedHandler } from '../interface'
Expand Down Expand Up @@ -110,57 +110,51 @@ export class Database {
})
}

insert<T>(tableName: string, raw: T[]): Observable<ExecutorResult>
insert<T>(tableName: string, raw: T[], revertController?: RevertController): Observable<ExecutorResult>

insert<T>(tableName: string, raw: T): Observable<ExecutorResult>
insert<T>(tableName: string, raw: T, revertController?: RevertController): Observable<ExecutorResult>

insert<T>(tableName: string, raw: T | T[]): Observable<ExecutorResult>
insert<T>(tableName: string, raw: T | T[], revertController?: RevertController): Observable<ExecutorResult>

insert<T>(tableName: string, raw: T | T[]): Observable<ExecutorResult> {
return this.database$
.concatMap(db => {
const schema = this.findSchema(tableName)
const pk = schema.pk
const columnMapper = schema.mapper
const [ table ] = Database.getTables(db, tableName)
const muts: Mutation[] = []
const entities = clone(raw)

const iterator = Array.isArray(entities) ? entities : [entities]

iterator.forEach((entity: any) => {
const mut = new Mutation(db, table)
const hiddenPayload = Object.create(null)

columnMapper.forEach((mapper, key) => {
// cannot create a hidden column for primary key
if (!hasOwn(entity, key) || key === pk) {
return
insert<T>(tableName: string, raw: T | T[], revertController?: RevertController): Observable<ExecutorResult> {
return this.database$.concatMap(db => {
const { queries, contextIds } = this.buildInsertQuery(db, tableName, raw)
const schema = this.findSchema(tableName)
const pk = schema.pk
const [ table ] = Database.getTables(db, tableName)
return Observable.fromPromise(executor(db, queries))
.do({
next: () => {
if (revertController) {
const equalClause = (Array.isArray(raw) ? raw : [raw])
.map(data => ({
[pk]: data[pk]
}))
const clause = { $or: equalClause }
const tablesStruct: TablesStruct = {
[tableName]: {
table, contextName: tableName
}
}
const provider = new PredicateProvider(tablesStruct, tableName, clause)
const deleteQuery =
predicatableQuery(db, table, provider.getPredicate(), StatementType.Delete)
revertController.inject(
db, [ deleteQuery ]
)
}

const val = entity[key]
hiddenPayload[key] = mapper(val)
hiddenPayload[hiddenColName(key)] = val
})

mut.patch({ ...entity, ...hiddenPayload })
mut.withId(pk, entity[pk])
muts.push(mut)
},
error: () => contextIds.forEach(id => this.storedIds.delete(id))
})

const { contextIds, queries } = Mutation.aggregate(db, muts, [])
contextIds.forEach(id => this.storedIds.add(id))
return this.executor(db, queries)
.do({ error: () => contextIds.forEach(id => this.storedIds.delete(id)) })
})
})
}

get<T>(tableName: string, query: Query<T> = {}, mode: JoinMode = JoinMode.imlicit): QueryToken<T> {
const selector$ = this.buildSelector<T>(tableName, query, mode)
return new QueryToken<T>(selector$)
}

update<T>(tableName: string, clause: Predicate<T>, raw: Partial<T>): Observable<ExecutorResult> {
update<T>(tableName: string, clause: Predicate<T>, raw: Partial<T>, revertController?: RevertController): Observable<ExecutorResult> {
const type = getType(raw)
if (type !== 'Object') {
return Observable.throw(Exception.InvalidType(['Object', type]))
Expand All @@ -172,7 +166,7 @@ export class Database {
}

return this.database$
.concatMap<any, any>(db => {
.concatMap(db => {
const entity = clone(raw)
const [ table ] = Database.getTables(db, tableName)
const columnMapper = schema!.mapper
Expand Down Expand Up @@ -210,42 +204,80 @@ export class Database {
}
})

return this.executor(db, [query])
if (revertController) {
const columns = Object.keys(raw).map(key => table[key])
predicatableQuery(db, table, predicate!, StatementType.Select, ...columns)
.exec()
.then(([ value ]) => {
const revertQuery = predicatableQuery(db, table, predicate!, StatementType.Update)
forEach(value, (val, key) => {
const column = table[key]
revertQuery.set(column, val)
})
revertController.inject(
db, [ revertQuery ]
)
})
.catch(e => {
revertController.giveup()
warn(e)
})
}

return Observable.fromPromise(executor(db, [ query ]))
.do({
error: () => {
if (revertController) {
revertController.giveup()
}
}
})
})
}

delete<T>(tableName: string, clause: Predicate<T> = {}): Observable<ExecutorResult> {
delete<T>(tableName: string, clause: Predicate<T> = {}, revertController?: RevertController): Observable<ExecutorResult> {
const [pk, err] = tryCatch<string>(this.findPrimaryKey)(tableName)
if (err) {
return Observable.throw(err)
}

return this.database$
.concatMap(db => {
const [ table ] = Database.getTables(db, tableName)
const tables = this.buildTablesStructure(table)
const column = table[pk!]
const provider = new PredicateProvider(tables, tableName, clause)
const prefetch =
predicatableQuery(db, table, provider.getPredicate(), StatementType.Select, column)

return Observable.fromPromise(prefetch.exec())
.concatMap((scopedIds) => {
const predicate = provider.getPredicate()
if (!predicate) {
warn(`The result of parsed Predicate is null, you are deleting all ${ tableName } Table!`)
}
const query = predicatableQuery(db, table, predicate, StatementType.Delete)

scopedIds.forEach((entity: any) =>
this.storedIds.delete(fieldIdentifier(tableName, entity[pk!])))
if (revertController) {
if (!clause) {
throw Exception.clauseMissingError()
}
}

return this.executor(db, [query]).do({ error: () => {
scopedIds.forEach((entity: any) =>
this.storedIds.add(fieldIdentifier(tableName, entity[pk!])))
}})
})
})
return this.database$.concatMap(db => {
const [ table ] = Database.getTables(db, tableName)
const tablesStruct = this.buildTablesStructure(table)
const provider = new PredicateProvider(tablesStruct, tableName, clause)
const predicate = provider.getPredicate()
if (!predicate) {
warn(`The result of parsed Predicate is null, you are deleting all ${ tableName } Tables!`)
}
const query = predicatableQuery(db, table, predicate, StatementType.Delete)
const columns = revertController ? [] as any : [ pk ]
return Observable.fromPromise(
this.deletePrefetch(db, table, provider, columns)
)
.concatMap(scopedIds =>
Observable.fromPromise(executor(db, [query]))
.do({
next: () => {
if (revertController) {
const { queries } = this.buildInsertQuery(db, tableName, scopedIds)
revertController.inject(db, queries)
}
scopedIds.forEach((entity: any) =>
this.storedIds.delete(fieldIdentifier(tableName, entity[pk!])))
},
error: () => {
scopedIds.forEach((entity: any) =>
this.storedIds.add(fieldIdentifier(tableName, entity[pk!])))
}
})
)
})
}

upsert<T>(tableName: string, raw: T): Observable<ExecutorResult>
Expand All @@ -264,7 +296,7 @@ export class Database {
const { contextIds, queries } = Mutation.aggregate(db, insert, update)
if (queries.length > 0) {
contextIds.forEach(id => this.storedIds.add(id))
return this.executor(db, queries)
return Observable.fromPromise(executor(db, queries))
.do({ error: () => contextIds.forEach(id => this.storedIds.delete(id)) })
} else {
return Observable.of({ result: false, insert: 0, update: 0, delete: 0, select: 0 })
Expand All @@ -289,7 +321,7 @@ export class Database {
}

const queries: lf.query.Builder[] = []
const removedIds: any = []
const removedIds: string[] = []
queries.push(predicatableQuery(db, table, predicate!, StatementType.Delete))

const prefetch = predicatableQuery(db, table, predicate!, StatementType.Select)
Expand All @@ -303,10 +335,10 @@ export class Database {
const scope = this.createScopedHandler<T>(db, queries, removedIds)
return disposeHandler(rootEntities, scope)
.do(() => removedIds.forEach((id: string) => this.storedIds.delete(id)))
.concatMap(() => this.executor(db, queries))
.concatMap(() => executor(db, queries))
} else {
removedIds.forEach((id: string) => this.storedIds.delete(id))
return this.executor(db, queries)
return executor(db, queries)
}
})
.do({ error: () =>
Expand All @@ -329,7 +361,7 @@ export class Database {
db.getSchema().tables().map(t => db.delete().from(t)))

return this.database$.concatMap(db => {
return cleanup.concatMap(queries => this.executor(db, queries))
return cleanup.concatMap(queries => executor(db, queries))
.do(() => {
db.close()
this.schemas.clear()
Expand Down Expand Up @@ -519,6 +551,67 @@ export class Database {
}
}

private buildInsertQuery<T>(db: lf.Database, tableName: string, raw: T | T[]) {
const schema = this.findSchema(tableName)
const pk = schema.pk
const columnMapper = schema.mapper
const [ table ] = Database.getTables(db, tableName)
const muts: Mutation[] = []
const entities = clone(raw)

const iterator = Array.isArray(entities) ? entities : [entities]

iterator.forEach((entity: any) => {
const mut = new Mutation(db, table)
const hiddenPayload = Object.create(null)

columnMapper.forEach((mapper, key) => {
// cannot create a hidden column for primary key
if (!hasOwn(entity, key) || key === pk) {
return
}

const val = entity[key]
hiddenPayload[key] = mapper(val)
hiddenPayload[hiddenColName(key)] = val
})

mut.patch({ ...entity, ...hiddenPayload })
mut.withId(pk, entity[pk])
muts.push(mut)
})

const { contextIds, queries } = Mutation.aggregate(db, muts, [])
contextIds.forEach(id => this.storedIds.add(id))

return { queries, contextIds }
}

private deletePrefetch<T>(
db: lf.Database,
table: lf.schema.Table,
provider: PredicateProvider<T>,
columnNames: string[]
) {
let columns: lf.schema.Column[]
// build revert query
if (!columnNames.length) {
const tableName = table.getName()
columns = Array.from(this.findSchema(tableName).columns.keys())
.map(columnName => {
const column = table[columnName]
const hiddenName = hiddenColName(columnName)
const hiddenCol = table[hiddenName]
return hiddenCol || column
})
} else {
columns = columnNames.map(columnName => table[columnName])
}
const prefetch =
predicatableQuery(db, table, provider.getPredicate(), StatementType.Select, ...columns)
return prefetch.exec()
}

// context 用来标记DFS路径中的所有出现过的表,用于解决self-join时的二义性
// path 用来标记每个查询路径上出现的表,用于解决circular reference
private traverseQueryFields(
Expand Down Expand Up @@ -951,20 +1044,4 @@ export class Database {
return newTableName!
}

private executor(db: lf.Database, queries: lf.query.Builder[]) {
const tx = db.createTransaction()
const handler = {
error: () => warn(`Execute failed, transaction is already marked for rollback.`)
}

return Observable.fromPromise(tx.exec(queries))
.do(handler)
.map((ret) => {
return {
result: true,
...mergeTransactionResult(queries, ret)
}
})
}

}
17 changes: 17 additions & 0 deletions src/storage/helper/executor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import { mergeTransactionResult } from './merge-transaction-result'
import { ExecutorResult } from '../../interface'
import { warn } from '../../utils'

export function executor(db: lf.Database, queries: lf.query.Builder[]): Promise<ExecutorResult> {
const tx = db.createTransaction()

return tx.exec(queries)
.then(ret => ({
result: true,
...mergeTransactionResult(queries, ret)
}))
.catch((e: Error) => {
warn(`Execute failed, transaction is already marked for rollback.`)
throw e
})
}
Loading

0 comments on commit e853646

Please sign in to comment.