Skip to content

Commit

Permalink
Connection: Fix a race condition in direct chats handling upon initia…
Browse files Browse the repository at this point in the history
…l sync

Closes #323.
  • Loading branch information
KitsuneRal committed May 19, 2019
1 parent 92f10dc commit 432dc68
Showing 1 changed file with 75 additions and 54 deletions.
129 changes: 75 additions & 54 deletions lib/connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,7 @@ class Connection::Private
: data(move(connection))
{ }
Q_DISABLE_COPY(Private)
Private(Private&&) = delete;
Private operator=(Private&&) = delete;
DISABLE_MOVE(Private)

Connection* q = nullptr;
std::unique_ptr<ConnectionData> data;
Expand All @@ -91,6 +90,10 @@ class Connection::Private
QMap<QString, User*> userMap;
DirectChatsMap directChats;
DirectChatUsersMap directChatUsers;
// The below two variables track local changes between sync completions.
// See also: https://github.com/QMatrixClient/libqmatrixclient/wiki/Handling-direct-chat-events
DirectChatsMap dcLocalAdditions;
DirectChatsMap dcLocalRemovals;
std::unordered_map<QString, EventPtr> accountData;
QString userId;
int syncLoopTimeout = -1;
Expand All @@ -107,8 +110,6 @@ class Connection::Private

void connectWithToken(const QString& user, const QString& accessToken,
const QString& deviceId);
void broadcastDirectChatUpdates(const DirectChatsMap& additions,
const DirectChatsMap& removals);

template <typename EventT>
EventT* unpackAccountData() const
Expand Down Expand Up @@ -373,6 +374,20 @@ void Connection::syncLoop(int timeout)
syncLoopIteration(); // initial sync to start the loop
}

QJsonObject toJson(const Connection::DirectChatsMap& directChats)
{
QJsonObject json;
for (auto it = directChats.begin(); it != directChats.end();)
{
QJsonArray roomIds;
const auto* user = it.key();
for (; it != directChats.end() && it.key() == user; ++it)
roomIds.append(*it);
json.insert(user->id(), roomIds);
}
return json;
}

void Connection::onSyncSuccess(SyncData &&data, bool fromCache) {
d->data->setLastEvent(data.nextBatch());
for (auto&& roomData: data.takeRoomData())
Expand Down Expand Up @@ -409,33 +424,43 @@ void Connection::onSyncSuccess(SyncData &&data, bool fromCache) {
}
// After running this loop, the account data events not saved in
// d->accountData (see the end of the loop body) are auto-cleaned away
for (auto& eventPtr: data.takeAccountData())
for (auto& eventPtr : data.takeAccountData())
{
visit(*eventPtr,
[this](const DirectChatEvent& dce) {
// See https://github.com/QMatrixClient/libqmatrixclient/wiki/Handling-direct-chat-events
const auto& usersToDCs = dce.usersToDirectChats();
DirectChatsMap removals =
erase_if(d->directChats, [&usersToDCs](auto it) {
return !usersToDCs.contains(it.key()->id(),
it.value());
});
erase_if(d->directChatUsers, [&usersToDCs](auto it) {
return !usersToDCs.contains(it.value()->id(), it.key());
DirectChatsMap remoteRemovals =
erase_if(d->directChats, [&usersToDCs, this](auto it) {
return !(usersToDCs.contains(it.key()->id(), it.value())
|| d->dcLocalAdditions.contains(it.key(),
it.value()));
});
erase_if(d->directChatUsers, [&remoteRemovals](auto it) {
return remoteRemovals.contains(it.value(), it.key());
});
// Remove from dcLocalRemovals what the server already has.
erase_if(d->dcLocalRemovals, [&remoteRemovals](auto it) {
return remoteRemovals.contains(it.key(), it.value());
});
if (MAIN().isDebugEnabled())
for (auto it = removals.begin(); it != removals.end();
++it)
qCDebug(MAIN) << it.value()
<< "is no more a direct chat with"
<< it.key()->id();

DirectChatsMap additions;
for (auto it = remoteRemovals.begin();
it != remoteRemovals.end(); ++it) {
qCDebug(MAIN)
<< it.value() << "is no more a direct chat with"
<< it.key()->id();
}

DirectChatsMap remoteAdditions;
for (auto it = usersToDCs.begin(); it != usersToDCs.end();
++it) {
if (auto* u = user(it.key())) {
if (!d->directChats.contains(u, it.value())) {
Q_ASSERT(!d->directChatUsers.contains(it.value(),
u));
additions.insert(u, it.value());
if (!d->directChats.contains(u, it.value())
&& !d->dcLocalRemovals.contains(u, it.value()))
{
Q_ASSERT(
!d->directChatUsers.contains(it.value(), u));
remoteAdditions.insert(u, it.value());
d->directChats.insert(u, it.value());
d->directChatUsers.insert(it.value(), u);
qCDebug(MAIN)
Expand All @@ -446,8 +471,12 @@ void Connection::onSyncSuccess(SyncData &&data, bool fromCache) {
qCWarning(MAIN) << "Couldn't get a user object for"
<< it.key();
}
if (!additions.isEmpty() || !removals.isEmpty())
emit directChatsListChanged(additions, removals);
// Remove from dcLocalAdditions what the server already has.
erase_if(d->dcLocalAdditions, [&remoteAdditions](auto it) {
return remoteAdditions.contains(it.key(), it.value());
});
if (!remoteAdditions.isEmpty() || !remoteRemovals.isEmpty())
emit directChatsListChanged(remoteAdditions, remoteRemovals);
},
// catch-all, passing eventPtr for a possible take-over
[this, &eventPtr](const Event& accountEvent) {
Expand All @@ -468,6 +497,16 @@ void Connection::onSyncSuccess(SyncData &&data, bool fromCache) {
emit accountDataChanged(currentData->matrixType());
}
});
}
if (!d->dcLocalAdditions.isEmpty() || !d->dcLocalRemovals.isEmpty()) {
qDebug(MAIN) << "Sending updated direct chats to the server:"
<< d->dcLocalRemovals.size() << "removal(s),"
<< d->dcLocalAdditions.size() << "addition(s)";
callApi<SetAccountDataJob>(d->userId, QStringLiteral("m.direct"),
toJson(d->directChats));
d->dcLocalAdditions.clear();
d->dcLocalRemovals.clear();
}
}

void Connection::stopSync()
Expand Down Expand Up @@ -662,8 +701,8 @@ void Connection::doInDirectChat(User* u,
{
Q_ASSERT(u);
const auto& userId = u->id();
// There can be more than one DC; find the first valid, and delete invalid
// (left/forgotten) ones along the way.
// There can be more than one DC; find the first valid (existing and
// not left), and delete inexistent (forgotten?) ones along the way.
DirectChatsMap removals;
for (auto it = d->directChats.find(u);
it != d->directChats.end() && it.key() == u; ++it)
Expand Down Expand Up @@ -700,6 +739,8 @@ void Connection::doInDirectChat(User* u,
<< roomId << "is not valid and will be discarded";
// Postpone actual deletion until we finish iterating d->directChats.
removals.insert(it.key(), it.value());
// Add to the list of updates to send to the server upon the next sync.
d->dcLocalRemovals.insert(it.key(), it.value());
}
if (!removals.isEmpty())
{
Expand All @@ -709,7 +750,7 @@ void Connection::doInDirectChat(User* u,
d->directChatUsers.remove(it.value(),
const_cast<User*>(it.key())); // FIXME
}
d->broadcastDirectChatUpdates({}, removals);
emit directChatsListChanged({}, removals);
}

auto j = createDirectChat(userId);
Expand Down Expand Up @@ -1010,28 +1051,6 @@ Connection::DirectChatsMap Connection::directChats() const
return d->directChats;
}

QJsonObject toJson(const Connection::DirectChatsMap& directChats)
{
QJsonObject json;
for (auto it = directChats.begin(); it != directChats.end();)
{
QJsonArray roomIds;
const auto* user = it.key();
for (; it != directChats.end() && it.key() == user; ++it)
roomIds.append(*it);
json.insert(user->id(), roomIds);
}
return json;
}

void Connection::Private::broadcastDirectChatUpdates(const DirectChatsMap& additions,
const DirectChatsMap& removals)
{
q->callApi<SetAccountDataJob>(userId, QStringLiteral("m.direct"),
toJson(directChats));
emit q->directChatsListChanged(additions, removals);
}

void Connection::addToDirectChats(const Room* room, User* user)
{
Q_ASSERT(room != nullptr && user != nullptr);
Expand All @@ -1040,8 +1059,8 @@ void Connection::addToDirectChats(const Room* room, User* user)
Q_ASSERT(!d->directChatUsers.contains(room->id(), user));
d->directChats.insert(user, room->id());
d->directChatUsers.insert(room->id(), user);
DirectChatsMap additions { { user, room->id() } };
d->broadcastDirectChatUpdates(additions, {});
d->dcLocalAdditions.insert(user, room->id());
emit directChatsListChanged({ { user, room->id() } }, {});
}

void Connection::removeFromDirectChats(const QString& roomId, User* user)
Expand All @@ -1054,15 +1073,17 @@ void Connection::removeFromDirectChats(const QString& roomId, User* user)
DirectChatsMap removals;
if (user != nullptr)
{
removals.insert(user, roomId);
d->directChats.remove(user, roomId);
d->directChatUsers.remove(roomId, user);
removals.insert(user, roomId);
d->dcLocalRemovals.insert(user, roomId);
} else {
removals = erase_if(d->directChats,
[&roomId] (auto it) { return it.value() == roomId; });
d->directChatUsers.remove(roomId);
d->dcLocalRemovals += removals;
}
d->broadcastDirectChatUpdates({}, removals);
emit directChatsListChanged({}, removals);
}

bool Connection::isDirectChat(const QString& roomId) const
Expand Down

0 comments on commit 432dc68

Please sign in to comment.