Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multiple tables updates in a transaction sometimes have out of sync responses in the watch streams #3338

Open
AhmedLSayed9 opened this issue Nov 13, 2024 · 1 comment

Comments

@AhmedLSayed9
Copy link
Contributor

AhmedLSayed9 commented Nov 13, 2024

If we update multiple tables in a transaction and we're listening to both tables updates, sometimes they don't emit the updates synchronously in the same frame.

This would cause issues if we merge both streams using combineLatest2 like in Selecting a cart example and we expect the updates to be emitted in the same time (i.e: the cart must have items with it in our use case, so there can't be a cart with no items).

Minimal example:
database.dart:

import 'package:drift/drift.dart';
import 'package:drift_flutter/drift_flutter.dart';

part 'database.g.dart';

class TodoItems extends Table {
  IntColumn get id => integer().autoIncrement()();
  TextColumn get title => text()();
  IntColumn get category => integer().nullable().references(TodoCategory, #id)();
}

class TodoCategory extends Table {
  IntColumn get id => integer().autoIncrement()();
  TextColumn get description => text()();
}

@DriftDatabase(tables: [TodoItems, TodoCategory])
class AppDatabase extends _$AppDatabase {
  // After generating code, this class needs to define a schemaVersion getter
  // and a constructor telling drift where the database should be stored.
  // These are described in the getting started guide: https://drift.simonbinder.eu/getting-started/#open
  AppDatabase() : super(_openConnection());

  @override
  int get schemaVersion => 1;

  static QueryExecutor _openConnection() {
    // driftDatabase from package:drift_flutter stores the database in
    // getApplicationDocumentsDirectory().
    return driftDatabase(name: 'my_database');
  }
}

main.dart:

import 'package:drift/drift.dart';
import 'package:flutter/material.dart';
import 'package:rxdart/rxdart.dart';
import 'package:testapp/drift/database.dart';

final database = AppDatabase();

Stream<({List<TodoCategoryData> todoCategories, List<TodoItem> todoItems})>
    fetchTodoWithCategory() {
  final todoCategoryStream = database.managers.todoCategory.watch();
  final todoItemsStream = database.managers.todoItems.watch();

  return Rx.combineLatest2(todoCategoryStream, todoItemsStream, (categories, items) {
    return (todoCategories: categories, todoItems: items);
  });
}

createCategoryAndTodo() async {
  return database.transaction(() async {
    await database.managers.todoCategory.create(
      (o) => o(id: const Value(1), description: 'Description'),
      mode: InsertMode.insertOrIgnore,
    );
    await database.managers.todoItems.create(
      (o) => o(category: const Value(1), title: 'Title'),
    );
  });
}

void main() async {
  WidgetsFlutterBinding.ensureInitialized();
  await database.managers.todoCategory.delete();
  await database.managers.todoItems.delete();
  runApp(const MyApp());
}

class MyApp extends StatelessWidget {
  const MyApp({super.key});

  @override
  Widget build(BuildContext context) {
    return const MaterialApp(
      home: HomePage(),
    );
  }
}

class HomePage extends StatefulWidget {
  const HomePage({super.key});

  @override
  HomePageState createState() => HomePageState();
}

class HomePageState extends State<HomePage> {
  final stream = fetchTodoWithCategory();

  @override
  Widget build(BuildContext context) {
    return Scaffold(
      appBar: AppBar(title: const Text('CombineLatest2 Example')),
      body: Center(
        child: StreamBuilder(
          stream: stream,
          builder: (context, snapshot) {
            if (!snapshot.hasData) {
              return const CircularProgressIndicator();
            }
            print(snapshot.data!);
            return Text(
              snapshot.data!.toString(),
              style: const TextStyle(fontSize: 24),
            );
          },
        ),
      ),
      floatingActionButton: const FloatingActionButton(
        onPressed: createCategoryAndTodo,
        child: Icon(Icons.add),
      ),
    );
  }
}

Demonstration video:

Screen.Recording.2024-11-13.at.8.41.23.PM.mp4

Furthermore, Testing combineLatest2 without drift seems to work as expected (emits values in the same frame):

import 'package:flutter/material.dart';
import 'package:rxdart/rxdart.dart';

void main() {
  runApp(const MyApp());
}

class MyApp extends StatelessWidget {
  const MyApp({super.key});

  @override
  Widget build(BuildContext context) {
    return const MaterialApp(
      home: HomePage(),
    );
  }
}

class HomePage extends StatefulWidget {
  const HomePage({super.key});

  @override
  HomePageState createState() => HomePageState();
}

class HomePageState extends State<HomePage> {
  // Create two Subjects for the streams
  final BehaviorSubject<int> _stream1 = BehaviorSubject<int>.seeded(0);
  final BehaviorSubject<int> _stream2 = BehaviorSubject<int>.seeded(0);
  late Stream<String> _combinedStream;

  @override
  void initState() {
    super.initState();
    // Initialize the combined stream
    _combinedStream = Rx.combineLatest2<int, int, String>(
      _stream1.stream,
      _stream2.stream,
      (value1, value2) => 'Stream 1: $value1, Stream 2: $value2',
    );
  }

  @override
  void dispose() {
    // Close the streams to free resources
    _stream1.close();
    _stream2.close();
    super.dispose();
  }

  void _emitValues() {
    // Emit new values to both streams when the button is pressed
    _stream1.add(_stream1.value + 1);
    _stream2.add(_stream2.value + 2);
  }

  @override
  Widget build(BuildContext context) {
    return Scaffold(
      appBar: AppBar(title: const Text('CombineLatest2 Example')),
      body: Center(
        child: StreamBuilder<String>(
          stream: _combinedStream,
          builder: (context, snapshot) {
            if (!snapshot.hasData) {
              return const CircularProgressIndicator();
            }
            print(snapshot.data!);
            return Text(
              snapshot.data!,
              style: const TextStyle(fontSize: 24),
            );
          },
        ),
      ),
      floatingActionButton: FloatingActionButton(
        onPressed: _emitValues,
        child: const Icon(Icons.add),
      ),
    );
  }
}
@simolus3
Copy link
Owner

Thank you for the detailed description, this is a good catch!

When a transaction completes, drift invalidates all queries on any table affected by the transaction. When there are lots of pending queries, they might take a while to complete and since they're running on a background isolate, it's perfectly valid for them to complete at different times.

I know that this can violate some correctness assumptions, but generally is working as intended (since you're not running the queries in a transaction, they can yield inconsistent results).
The problem is that drift makes it really hard to get this right (as is evident by the docs showing a broken example). I think we should have an API to express a "these n streams must always emit consistent results" wish by the user, since doing this across all streams by default would make them quite slow.

Maybe it could look something like this:

late categories, todos;

final stream = database.streamGroup((builder) {
  categories = builder.add(database.managers.todoCategory.watch());
  todos = builder.add(database.managers.todoItems.watch());
});

return stream.watch((results) {
    return (todoCategories: results[categories], todoItems: results[todos]);
});

That API design isn't great, but we need something that's reasonably type safe, supports an arbitrary amount of streams and delivers all updates from the source stream in a single stream (I'm not sure if emitting them in the same frame from different streams is a sound design, e.g. what happens if you pause some of the streams. It introduces a weird synchronous pairing between different streams).

Let me know if you have ideas on how to solve this as well, but I don't see a way to not make this opt-in for some streams since it comes with a performance penalty for managing the transaction.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants