From 73340b94e3dbcc67da8e52ea5ddea06298c2c6d4 Mon Sep 17 00:00:00 2001 From: Anton Popovichenko Date: Thu, 28 Mar 2024 12:57:29 +0100 Subject: [PATCH] feat(Core/Compression): Move packets compression from map to network thread (#18602) * feat(Code/Compression): Move packets compression from map to network thread. * Code style fix * Remove unicode letter --- .../Battlegrounds/Zones/BattlegroundSA.cpp | 6 +- .../game/Entities/GameObject/GameObject.cpp | 2 +- src/server/game/Entities/Object/Object.cpp | 2 +- .../Entities/Object/Updates/UpdateData.cpp | 97 ++----------------- .../game/Entities/Object/Updates/UpdateData.h | 4 +- .../game/Entities/Player/PlayerUpdates.cpp | 4 +- src/server/game/Entities/Unit/Unit.cpp | 2 +- .../game/Grids/Notifiers/GridNotifiers.cpp | 2 +- src/server/game/Groups/Group.cpp | 4 +- src/server/game/Maps/Map.cpp | 14 +-- src/server/game/Server/WorldSocket.cpp | 85 +++++++++++++++- src/server/game/Server/WorldSocket.h | 12 ++- .../SunwellPlateau/boss_brutallus.cpp | 4 +- src/server/scripts/Spells/spell_priest.cpp | 2 +- 14 files changed, 122 insertions(+), 118 deletions(-) diff --git a/src/server/game/Battlegrounds/Zones/BattlegroundSA.cpp b/src/server/game/Battlegrounds/Zones/BattlegroundSA.cpp index 8f16108f8315b8..ee160c6e832889 100644 --- a/src/server/game/Battlegrounds/Zones/BattlegroundSA.cpp +++ b/src/server/game/Battlegrounds/Zones/BattlegroundSA.cpp @@ -305,7 +305,7 @@ void BattlegroundSA::StartShips() UpdateData data; WorldPacket pkt; GetBGObject(i)->BuildValuesUpdateBlockForPlayer(&data, itr->second); - data.BuildPacket(&pkt); + data.BuildPacket(pkt); itr->second->GetSession()->SendPacket(&pkt); } } @@ -1106,7 +1106,7 @@ void BattlegroundSA::SendTransportInit(Player* player) if (BgObjects[BG_SA_BOAT_TWO]) GetBGObject(BG_SA_BOAT_TWO)->BuildCreateUpdateBlockForPlayer(&transData, player); WorldPacket packet; - transData.BuildPacket(&packet); + transData.BuildPacket(packet); player->GetSession()->SendPacket(&packet); } } @@ -1121,7 +1121,7 @@ void BattlegroundSA::SendTransportsRemove(Player* player) if (BgObjects[BG_SA_BOAT_TWO]) GetBGObject(BG_SA_BOAT_TWO)->BuildOutOfRangeUpdateBlock(&transData); WorldPacket packet; - transData.BuildPacket(&packet); + transData.BuildPacket(packet); player->GetSession()->SendPacket(&packet); } } diff --git a/src/server/game/Entities/GameObject/GameObject.cpp b/src/server/game/Entities/GameObject/GameObject.cpp index a488acc9325ef0..7f9886aa49816e 100644 --- a/src/server/game/Entities/GameObject/GameObject.cpp +++ b/src/server/game/Entities/GameObject/GameObject.cpp @@ -512,7 +512,7 @@ void GameObject::Update(uint32 diff) UpdateData udata; WorldPacket packet; BuildValuesUpdateBlockForPlayer(&udata, caster->ToPlayer()); - udata.BuildPacket(&packet); + udata.BuildPacket(packet); caster->ToPlayer()->GetSession()->SendPacket(&packet); SendCustomAnim(GetGoAnimProgress()); diff --git a/src/server/game/Entities/Object/Object.cpp b/src/server/game/Entities/Object/Object.cpp index 368f814ae5e683..a831a08048cf36 100644 --- a/src/server/game/Entities/Object/Object.cpp +++ b/src/server/game/Entities/Object/Object.cpp @@ -250,7 +250,7 @@ void Object::SendUpdateToPlayer(Player* player) WorldPacket packet; BuildCreateUpdateBlockForPlayer(&upd, player); - upd.BuildPacket(&packet); + upd.BuildPacket(packet); player->GetSession()->SendPacket(&packet); } diff --git a/src/server/game/Entities/Object/Updates/UpdateData.cpp b/src/server/game/Entities/Object/Updates/UpdateData.cpp index a452bc66af2df3..00c565c665e5c9 100644 --- a/src/server/game/Entities/Object/Updates/UpdateData.cpp +++ b/src/server/game/Entities/Object/Updates/UpdateData.cpp @@ -22,7 +22,6 @@ #include "Opcodes.h" #include "World.h" #include "WorldPacket.h" -#include "zlib.h" UpdateData::UpdateData() : m_blockCount(0) { @@ -46,103 +45,25 @@ void UpdateData::AddUpdateBlock(const UpdateData& block) m_blockCount += block.m_blockCount; } -void UpdateData::Compress(void* dst, uint32* dst_size, void* src, int src_size) +bool UpdateData::BuildPacket(WorldPacket& packet) { - z_stream c_stream; + ASSERT(packet.empty()); - c_stream.zalloc = (alloc_func)0; - c_stream.zfree = (free_func)0; - c_stream.opaque = (voidpf)0; + packet.reserve(4 + (m_outOfRangeGUIDs.empty() ? 0 : 1 + 4 + 9 * m_outOfRangeGUIDs.size()) + m_data.wpos()); - // default Z_BEST_SPEED (1) - int z_res = deflateInit(&c_stream, sWorld->getIntConfig(CONFIG_COMPRESSION)); - if (z_res != Z_OK) - { - LOG_ERROR("entities.object", "Can't compress update packet (zlib: deflateInit) Error code: {} ({})", z_res, zError(z_res)); - *dst_size = 0; - return; - } - - c_stream.next_out = (Bytef*)dst; - c_stream.avail_out = *dst_size; - c_stream.next_in = (Bytef*)src; - c_stream.avail_in = (uInt)src_size; - - z_res = deflate(&c_stream, Z_NO_FLUSH); - if (z_res != Z_OK) - { - LOG_ERROR("entities.object", "Can't compress update packet (zlib: deflate) Error code: {} ({})", z_res, zError(z_res)); - *dst_size = 0; - return; - } - - if (c_stream.avail_in != 0) - { - LOG_ERROR("entities.object", "Can't compress update packet (zlib: deflate not greedy)"); - *dst_size = 0; - return; - } - - z_res = deflate(&c_stream, Z_FINISH); - if (z_res != Z_STREAM_END) - { - LOG_ERROR("entities.object", "Can't compress update packet (zlib: deflate should report Z_STREAM_END instead {} ({})", z_res, zError(z_res)); - *dst_size = 0; - return; - } - - z_res = deflateEnd(&c_stream); - if (z_res != Z_OK) - { - LOG_ERROR("entities.object", "Can't compress update packet (zlib: deflateEnd) Error code: {} ({})", z_res, zError(z_res)); - *dst_size = 0; - return; - } - - *dst_size = c_stream.total_out; -} - -bool UpdateData::BuildPacket(WorldPacket* packet) -{ - ASSERT(packet->empty()); // shouldn't happen - - ByteBuffer buf(4 + (m_outOfRangeGUIDs.empty() ? 0 : 1 + 4 + 9 * m_outOfRangeGUIDs.size()) + m_data.wpos()); - - buf << (uint32) (!m_outOfRangeGUIDs.empty() ? m_blockCount + 1 : m_blockCount); + packet << (uint32) (!m_outOfRangeGUIDs.empty() ? m_blockCount + 1 : m_blockCount); if (!m_outOfRangeGUIDs.empty()) { - buf << (uint8) UPDATETYPE_OUT_OF_RANGE_OBJECTS; - buf << (uint32) m_outOfRangeGUIDs.size(); + packet << (uint8) UPDATETYPE_OUT_OF_RANGE_OBJECTS; + packet << (uint32) m_outOfRangeGUIDs.size(); for (ObjectGuid const& guid : m_outOfRangeGUIDs) - { - buf << guid.WriteAsPacked(); - } + packet << guid.WriteAsPacked(); } - buf.append(m_data); - - size_t pSize = buf.wpos(); // use real used data size - - if (pSize > 100) // compress large packets - { - uint32 destsize = compressBound(pSize); - packet->resize(destsize + sizeof(uint32)); - - packet->put(0, pSize); - Compress(const_cast(packet->contents()) + sizeof(uint32), &destsize, (void*)buf.contents(), pSize); - if (destsize == 0) - return false; - - packet->resize(destsize + sizeof(uint32)); - packet->SetOpcode(SMSG_COMPRESSED_UPDATE_OBJECT); - } - else // send small packets without compression - { - packet->append(buf); - packet->SetOpcode(SMSG_UPDATE_OBJECT); - } + packet.append(m_data); + packet.SetOpcode(SMSG_UPDATE_OBJECT); return true; } diff --git a/src/server/game/Entities/Object/Updates/UpdateData.h b/src/server/game/Entities/Object/Updates/UpdateData.h index 4b8d9b7a4d5b8f..6a795bd7e0b27a 100644 --- a/src/server/game/Entities/Object/Updates/UpdateData.h +++ b/src/server/game/Entities/Object/Updates/UpdateData.h @@ -56,7 +56,7 @@ class UpdateData void AddOutOfRangeGUID(ObjectGuid guid); void AddUpdateBlock(const ByteBuffer& block); void AddUpdateBlock(const UpdateData& block); - bool BuildPacket(WorldPacket* packet); + bool BuildPacket(WorldPacket& packet); [[nodiscard]] bool HasData() const { return m_blockCount > 0 || !m_outOfRangeGUIDs.empty(); } void Clear(); @@ -64,7 +64,5 @@ class UpdateData uint32 m_blockCount; GuidVector m_outOfRangeGUIDs; ByteBuffer m_data; - - void Compress(void* dst, uint32* dst_size, void* src, int src_size); }; #endif diff --git a/src/server/game/Entities/Player/PlayerUpdates.cpp b/src/server/game/Entities/Player/PlayerUpdates.cpp index e56e3e3dc1cf2c..7bcb5e5674e843 100644 --- a/src/server/game/Entities/Player/PlayerUpdates.cpp +++ b/src/server/game/Entities/Player/PlayerUpdates.cpp @@ -1739,7 +1739,7 @@ void Player::UpdateTriggerVisibility() if (!udata.HasData()) return; - udata.BuildPacket(&packet); + udata.BuildPacket(packet); GetSession()->SendPacket(&packet); } @@ -1791,7 +1791,7 @@ void Player::UpdateForQuestWorldObjects() } } - udata.BuildPacket(&packet); + udata.BuildPacket(packet); GetSession()->SendPacket(&packet); } diff --git a/src/server/game/Entities/Unit/Unit.cpp b/src/server/game/Entities/Unit/Unit.cpp index f7576fbdf58e1f..f89793acd89e41 100644 --- a/src/server/game/Entities/Unit/Unit.cpp +++ b/src/server/game/Entities/Unit/Unit.cpp @@ -10595,7 +10595,7 @@ void Unit::SetOwnerGUID(ObjectGuid owner) UpdateData udata; WorldPacket packet; BuildValuesUpdateBlockForPlayer(&udata, player); - udata.BuildPacket(&packet); + udata.BuildPacket(packet); player->SendDirectMessage(&packet); RemoveFieldNotifyFlag(UF_FLAG_OWNER); diff --git a/src/server/game/Grids/Notifiers/GridNotifiers.cpp b/src/server/game/Grids/Notifiers/GridNotifiers.cpp index d69c9386bec793..db2ad0a6744660 100644 --- a/src/server/game/Grids/Notifiers/GridNotifiers.cpp +++ b/src/server/game/Grids/Notifiers/GridNotifiers.cpp @@ -99,7 +99,7 @@ void VisibleNotifier::SendToSelf() return; WorldPacket packet; - i_data.BuildPacket(&packet); + i_data.BuildPacket(packet); i_player.GetSession()->SendPacket(&packet); for (std::vector::const_iterator it = i_visibleNow.begin(); it != i_visibleNow.end(); ++it) diff --git a/src/server/game/Groups/Group.cpp b/src/server/game/Groups/Group.cpp index f6563042ef1b9c..8db5f3740f3174 100644 --- a/src/server/game/Groups/Group.cpp +++ b/src/server/game/Groups/Group.cpp @@ -520,7 +520,7 @@ bool Group::AddMember(Player* player) player->BuildValuesUpdateBlockForPlayer(&newData, itrMember); if (newData.HasData()) { - newData.BuildPacket(&newDataPacket); + newData.BuildPacket(newDataPacket); itrMember->SendDirectMessage(&newDataPacket); } } @@ -529,7 +529,7 @@ bool Group::AddMember(Player* player) if (groupData.HasData()) { - groupData.BuildPacket(&groupDataPacket); + groupData.BuildPacket(groupDataPacket); player->SendDirectMessage(&groupDataPacket); } diff --git a/src/server/game/Maps/Map.cpp b/src/server/game/Maps/Map.cpp index 905c6632ecb7eb..8eecf7ced1f3c3 100644 --- a/src/server/game/Maps/Map.cpp +++ b/src/server/game/Maps/Map.cpp @@ -644,7 +644,7 @@ bool Map::AddToMap(MotionTransport* obj, bool /*checkTransport*/) UpdateData data; obj->BuildCreateUpdateBlockForPlayer(&data, itr->GetSource()); WorldPacket packet; - data.BuildPacket(&packet); + data.BuildPacket(packet); itr->GetSource()->SendDirectMessage(&packet); } } @@ -971,7 +971,7 @@ void Map::RemoveFromMap(MotionTransport* obj, bool remove) UpdateData data; obj->BuildOutOfRangeUpdateBlock(&data); WorldPacket packet; - data.BuildPacket(&packet); + data.BuildPacket(packet); for (Map::PlayerList::const_iterator itr = players.begin(); itr != players.end(); ++itr) if (itr->GetSource()->GetTransport() != obj) itr->GetSource()->SendDirectMessage(&packet); @@ -2536,7 +2536,7 @@ void Map::SendInitSelf(Player* player) player->BuildCreateUpdateBlockForPlayer(&data, player); // build and send self update packet before sending to player his own auras - data.BuildPacket(&packet); + data.BuildPacket(packet); player->SendDirectMessage(&packet); // send to player his own auras (this is needed here for timely initialization of some fields on client) @@ -2552,7 +2552,7 @@ void Map::SendInitSelf(Player* player) if (player != (*itr) && player->HaveAtClient(*itr)) (*itr)->BuildCreateUpdateBlockForPlayer(&data, player); - data.BuildPacket(&packet); + data.BuildPacket(packet); player->SendDirectMessage(&packet); } @@ -2565,7 +2565,7 @@ void Map::SendInitTransports(Player* player) (*itr)->BuildCreateUpdateBlockForPlayer(&transData, player); WorldPacket packet; - transData.BuildPacket(&packet); + transData.BuildPacket(packet); player->GetSession()->SendPacket(&packet); } @@ -2590,7 +2590,7 @@ void Map::SendRemoveTransports(Player* player) } WorldPacket packet; - transData.BuildPacket(&packet); + transData.BuildPacket(packet); player->GetSession()->SendPacket(&packet); } @@ -2621,7 +2621,7 @@ void Map::SendObjectUpdates() WorldPacket packet; // here we allocate a std::vector with a size of 0x10000 for (UpdateDataMapType::iterator iter = update_players.begin(); iter != update_players.end(); ++iter) { - iter->second.BuildPacket(&packet); + iter->second.BuildPacket(packet); iter->first->GetSession()->SendPacket(&packet); packet.clear(); // clean the string } diff --git a/src/server/game/Server/WorldSocket.cpp b/src/server/game/Server/WorldSocket.cpp index f6317b5049f95a..05235bc6fad440 100644 --- a/src/server/game/Server/WorldSocket.cpp +++ b/src/server/game/Server/WorldSocket.cpp @@ -31,9 +31,88 @@ #include "World.h" #include "WorldSession.h" #include +#include "zlib.h" using boost::asio::ip::tcp; +void compressBuff(void* dst, uint32* dst_size, void* src, int src_size) +{ + z_stream c_stream; + + c_stream.zalloc = (alloc_func)0; + c_stream.zfree = (free_func)0; + c_stream.opaque = (voidpf)0; + + // default Z_BEST_SPEED (1) + int z_res = deflateInit(&c_stream, sWorld->getIntConfig(CONFIG_COMPRESSION)); + if (z_res != Z_OK) + { + LOG_ERROR("entities.object", "Can't compress update packet (zlib: deflateInit) Error code: {} ({})", z_res, zError(z_res)); + *dst_size = 0; + return; + } + + c_stream.next_out = (Bytef*)dst; + c_stream.avail_out = *dst_size; + c_stream.next_in = (Bytef*)src; + c_stream.avail_in = (uInt)src_size; + + z_res = deflate(&c_stream, Z_NO_FLUSH); + if (z_res != Z_OK) + { + LOG_ERROR("entities.object", "Can't compress update packet (zlib: deflate) Error code: {} ({})", z_res, zError(z_res)); + *dst_size = 0; + return; + } + + if (c_stream.avail_in != 0) + { + LOG_ERROR("entities.object", "Can't compress update packet (zlib: deflate not greedy)"); + *dst_size = 0; + return; + } + + z_res = deflate(&c_stream, Z_FINISH); + if (z_res != Z_STREAM_END) + { + LOG_ERROR("entities.object", "Can't compress update packet (zlib: deflate should report Z_STREAM_END instead {} ({})", z_res, zError(z_res)); + *dst_size = 0; + return; + } + + z_res = deflateEnd(&c_stream); + if (z_res != Z_OK) + { + LOG_ERROR("entities.object", "Can't compress update packet (zlib: deflateEnd) Error code: {} ({})", z_res, zError(z_res)); + *dst_size = 0; + return; + } + + *dst_size = c_stream.total_out; +} + +void EncryptableAndCompressiblePacket::CompressIfNeeded() +{ + if (!NeedsCompression()) + return; + + uint32 pSize = size(); + + uint32 destsize = compressBound(pSize); + ByteBuffer buf(destsize + sizeof(uint32)); + buf.resize(destsize + sizeof(uint32)); + + buf.put(0, pSize); + compressBuff(const_cast(buf.contents()) + sizeof(uint32), &destsize, (void*)contents(), pSize); + if (destsize == 0) + return; + + buf.resize(destsize + sizeof(uint32)); + + ByteBuffer::operator=(std::move(buf)); + SetOpcode(SMSG_COMPRESSED_UPDATE_OBJECT); +} + WorldSocket::WorldSocket(tcp::socket&& socket) : Socket(std::move(socket)), _OverSpeedPings(0), _worldSession(nullptr), _authed(false), _sendBufferSize(4096) { @@ -81,10 +160,12 @@ void WorldSocket::CheckIpCallback(PreparedQueryResult result) bool WorldSocket::Update() { - EncryptablePacket* queued; + EncryptableAndCompressiblePacket* queued; MessageBuffer buffer(_sendBufferSize); while (_bufferQueue.Dequeue(queued)) { + queued->CompressIfNeeded(); + ServerPktHeader header(queued->size() + 2, queued->GetOpcode()); if (queued->NeedsEncryption()) _authCrypt.EncryptSend(header.header, header.getHeaderLength()); @@ -427,7 +508,7 @@ void WorldSocket::SendPacket(WorldPacket const& packet) if (sPacketLog->CanLogPacket()) sPacketLog->LogPacket(packet, SERVER_TO_CLIENT, GetRemoteIpAddress(), GetRemotePort()); - _bufferQueue.Enqueue(new EncryptablePacket(packet, _authCrypt.IsInitialized())); + _bufferQueue.Enqueue(new EncryptableAndCompressiblePacket(packet, _authCrypt.IsInitialized())); } void WorldSocket::HandleAuthSession(WorldPacket & recvPacket) diff --git a/src/server/game/Server/WorldSocket.h b/src/server/game/Server/WorldSocket.h index dbad55fb5be08b..e1bb554d6af877 100644 --- a/src/server/game/Server/WorldSocket.h +++ b/src/server/game/Server/WorldSocket.h @@ -30,17 +30,21 @@ using boost::asio::ip::tcp; -class EncryptablePacket : public WorldPacket +class EncryptableAndCompressiblePacket : public WorldPacket { public: - EncryptablePacket(WorldPacket const& packet, bool encrypt) : WorldPacket(packet), _encrypt(encrypt) + EncryptableAndCompressiblePacket(WorldPacket const& packet, bool encrypt) : WorldPacket(packet), _encrypt(encrypt) { SocketQueueLink.store(nullptr, std::memory_order_relaxed); } bool NeedsEncryption() const { return _encrypt; } - std::atomic SocketQueueLink; + bool NeedsCompression() const { return GetOpcode() == SMSG_UPDATE_OBJECT && size() > 100; } + + void CompressIfNeeded(); + + std::atomic SocketQueueLink; private: bool _encrypt; @@ -125,7 +129,7 @@ class AC_GAME_API WorldSocket : public Socket MessageBuffer _headerBuffer; MessageBuffer _packetBuffer; - MPSCQueue _bufferQueue; + MPSCQueue _bufferQueue; std::size_t _sendBufferSize; QueryCallbackProcessor _queryProcessor; diff --git a/src/server/scripts/EasternKingdoms/SunwellPlateau/boss_brutallus.cpp b/src/server/scripts/EasternKingdoms/SunwellPlateau/boss_brutallus.cpp index 3124551d641e5b..c368be3e55f9ed 100644 --- a/src/server/scripts/EasternKingdoms/SunwellPlateau/boss_brutallus.cpp +++ b/src/server/scripts/EasternKingdoms/SunwellPlateau/boss_brutallus.cpp @@ -430,7 +430,7 @@ class spell_madrigosa_activate_barrier : public SpellScriptLoader UpdateData data; WorldPacket pkt; go->BuildValuesUpdateBlockForPlayer(&data, i->GetSource()); - data.BuildPacket(&pkt); + data.BuildPacket(pkt); i->GetSource()->GetSession()->SendPacket(&pkt); } } @@ -473,7 +473,7 @@ class spell_madrigosa_deactivate_barrier : public SpellScriptLoader UpdateData data; WorldPacket pkt; go->BuildValuesUpdateBlockForPlayer(&data, i->GetSource()); - data.BuildPacket(&pkt); + data.BuildPacket(pkt); i->GetSource()->GetSession()->SendPacket(&pkt); } } diff --git a/src/server/scripts/Spells/spell_priest.cpp b/src/server/scripts/Spells/spell_priest.cpp index 116b5cd2158905..04b7aebceca873 100644 --- a/src/server/scripts/Spells/spell_priest.cpp +++ b/src/server/scripts/Spells/spell_priest.cpp @@ -438,7 +438,7 @@ class spell_pri_lightwell_renew : public AuraScript UpdateData data; WorldPacket packet; caster->BuildValuesUpdateBlockForPlayer(&data, player); - data.BuildPacket(&packet); + data.BuildPacket(packet); player->SendDirectMessage(&packet); } }