Skip to content

Commit

Permalink
Major performance refactor with a few breaking changes.
Browse files Browse the repository at this point in the history
- Change how tables and primary keys are fetched to minimize reads.
- Allow for more efficient bulk writing in underlying implementation.
- Rework abstractions to allow exposing Sqlite batches.
- Rename classes to better reflect their goals.
- Correctly identify and forbid semicolon separated statements.
  • Loading branch information
cachapa committed Mar 3, 2024
1 parent 6d0870b commit 77f0c7b
Show file tree
Hide file tree
Showing 9 changed files with 261 additions and 267 deletions.
4 changes: 2 additions & 2 deletions .idea/libraries/Dart_Packages.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 10 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,13 @@
## 3.0.0

Major performance refactor with a few breaking changes.

- Change how tables and their primary keys are fetched to minimize read operations.
- Allow for more efficient bulk writing benefiting from underlying db features.
- Rework abstractions to allow exposing Sqlite batches.
- Rename classes to better reflect their goals.
- Correctly identify and forbid semicolon separated statements.

## 2.1.7

- Add support for inserts from select queries
Expand Down
3 changes: 2 additions & 1 deletion lib/sql_crdt.dart
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@ library sql_crdt;

export 'package:crdt/crdt.dart';

export 'src/base_crdt.dart';
export 'src/crdt_executor.dart';
export 'src/database_api.dart';
export 'src/sql_crdt.dart';
120 changes: 0 additions & 120 deletions lib/src/base_crdt.dart

This file was deleted.

144 changes: 125 additions & 19 deletions lib/src/timestamped_crdt.dart → lib/src/crdt_executor.dart
Original file line number Diff line number Diff line change
@@ -1,18 +1,125 @@
part of 'base_crdt.dart';
import 'package:crdt/crdt.dart';
import 'package:sqlparser/sqlparser.dart';
import 'package:sqlparser/utils/node_to_text.dart';

abstract class TimestampedCrdt extends BaseCrdt {
Hlc get canonicalTime;
import 'database_api.dart';

String get nodeId => canonicalTime.nodeId;
final _sqlEngine = SqlEngine();

TimestampedCrdt(super.db);
/// Intercepts CREATE TABLE queries to assist with table creation and updates.
/// Does not affect any other query types.
class CrdtTableExecutor {
final WriteApi _db;

CrdtTableExecutor(this._db);

/// Executes a SQL query with an optional [args] list.
/// Use "?" placeholders for parameters to avoid injection vulnerabilities:
///
/// ```
/// await crdt.execute(
/// 'INSERT INTO users (id, name) Values (?1, ?2)', [1, 'John Doe']);
/// ```
Future<void> execute(String sql, [List<Object?>? args]) async {
// Break query into individual statements
final statements =
(_sqlEngine.parseMultiple(sql).rootNode as SemicolonSeparatedStatements)
.statements;
assert(statements.length == 1,
'This package does not support compound statements:\n$sql');

final statement = statements.first;

// Bail on "manual" transaction statements
if (statement is BeginTransactionStatement ||
statement is CommitStatement) {
throw 'Unsupported statement: ${statement.toSql()}.\nUse transaction() instead.';
}

await _executeStatement(statement, args);
}

Future<void> _executeStatement(Statement statement, List<Object?>? args) =>
statement is CreateTableStatement
? _createTable(statement, args)
: _execute(statement, args);

Future<String> _createTable(
CreateTableStatement statement, List<Object?>? args) async {
final newStatement = CreateTableStatement(
tableName: statement.tableName,
columns: [
...statement.columns,
ColumnDefinition(
columnName: 'is_deleted',
typeName: 'INTEGER',
constraints: [Default(null, NumericLiteral(0))],
),
ColumnDefinition(
columnName: 'hlc',
typeName: 'TEXT',
constraints: [NotNull(null)],
),
ColumnDefinition(
columnName: 'node_id',
typeName: 'TEXT',
constraints: [NotNull(null)],
),
ColumnDefinition(
columnName: 'modified',
typeName: 'TEXT',
constraints: [NotNull(null)],
),
],
tableConstraints: statement.tableConstraints,
ifNotExists: statement.ifNotExists,
isStrict: statement.isStrict,
withoutRowId: statement.withoutRowId,
);

await _execute(newStatement, args);

return newStatement.tableName;
}

Future<String?> _execute(Statement statement, List<Object?>? args) async {
final sql = statement is InvalidStatement
? statement.span?.text
: statement.toSql();
if (sql == null) return null;

await _db.execute(sql, args);
return null;
}
}

class CrdtExecutor extends CrdtTableExecutor {
final Hlc hlc;
late final _hlcString = hlc.toString();

final affectedTables = <String>{};

CrdtExecutor(super._db, this.hlc);

@override
Future<void> _insert(InsertStatement statement, List<Object?>? args,
[Hlc? hlc]) async {
Future<void> _executeStatement(
Statement statement, List<Object?>? args) async {
final table = await switch (statement) {
CreateTableStatement statement => _createTable(statement, args),
InsertStatement statement => _insert(statement, args),
UpdateStatement statement => _update(statement, args),
DeleteStatement statement => _delete(statement, args),
// Else, run the query unchanged
_ => _execute(statement, args)
};
if (table != null) affectedTables.add(table);
}

Future<String> _insert(InsertStatement statement, List<Object?>? args) async {
// Force explicit column description in insert statements
assert(statement.targetColumns.isNotEmpty,
'Unsupported statement: target columns must be explicitly stated.\n${statement.toSql()}');

// Disallow star select statements
assert(
statement.source is! SelectInsertSource ||
Expand Down Expand Up @@ -88,14 +195,13 @@ abstract class TimestampedCrdt extends BaseCrdt {
}
}

hlc ??= canonicalTime;
args = [...args ?? [], hlc, hlc.nodeId, hlc];
args = [...args ?? [], _hlcString, hlc.nodeId, _hlcString];
await _execute(newStatement, args);

return newStatement.table.tableName;
}

@override
Future<void> _update(UpdateStatement statement, List<Object?>? args,
[Hlc? hlc]) async {
Future<String> _update(UpdateStatement statement, List<Object?>? args) async {
final argCount = args?.length ?? 0;
final newStatement = UpdateStatement(
withClause: statement.withClause,
Expand All @@ -121,14 +227,13 @@ abstract class TimestampedCrdt extends BaseCrdt {
where: statement.where,
);

hlc ??= canonicalTime;
args = [...args ?? [], hlc, hlc.nodeId, hlc];
args = [...args ?? [], _hlcString, hlc.nodeId, _hlcString];
await _execute(newStatement, args);

return newStatement.table.tableName;
}

@override
Future<void> _delete(DeleteStatement statement, List<Object?>? args,
[Hlc? hlc]) async {
Future<String> _delete(DeleteStatement statement, List<Object?>? args) async {
final argCount = args?.length ?? 0;
final newStatement = UpdateStatement(
returning: statement.returning,
Expand All @@ -155,8 +260,9 @@ abstract class TimestampedCrdt extends BaseCrdt {
where: statement.where,
);

hlc ??= canonicalTime;
args = [...args ?? [], 1, hlc, hlc.nodeId, hlc];
args = [...args ?? [], 1, _hlcString, hlc.nodeId, _hlcString];
await _execute(newStatement, args);

return newStatement.table.tableName;
}
}
Loading

0 comments on commit 77f0c7b

Please sign in to comment.