Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix multi thread emerge bug #15637

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
200 changes: 119 additions & 81 deletions src/emerge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,12 +119,16 @@ EmergeManager::EmergeManager(Server *server, MetricsBackend *mb)

EmergeManager::~EmergeManager()
{

m_request_stop = true;
for (u32 i = 0; i != m_threads.size(); i++) {
m_threads[i]->m_queue_event.signal();
}

for (u32 i = 0; i != m_threads.size(); i++) {
EmergeThread *thread = m_threads[i];

if (m_threads_active) {
thread->stop();
thread->signal();
thread->wait();
}

Expand Down Expand Up @@ -223,6 +227,9 @@ void EmergeManager::startThreads()
if (m_threads_active)
return;

m_turn_pending = 0;
m_request_stop = false;

for (u32 i = 0; i != m_threads.size(); i++)
m_threads[i]->start();

Expand All @@ -235,11 +242,12 @@ void EmergeManager::stopThreads()
if (!m_threads_active)
return;

// Request thread stop in parallel
for (u32 i = 0; i != m_threads.size(); i++) {
m_threads[i]->stop();
m_threads[i]->signal();
}
m_request_stop = true;

for (u32 i = 0; i != m_threads.size(); i++) {
m_threads[i]->m_queue_event.signal();
}


// Then do the waiting for each
for (u32 i = 0; i != m_threads.size(); i++)
Expand Down Expand Up @@ -278,7 +286,6 @@ bool EmergeManager::enqueueBlockEmergeEx(
EmergeCompletionCallback callback,
void *callback_param)
{
EmergeThread *thread = NULL;
bool entry_already_exists = false;

{
Expand All @@ -291,11 +298,25 @@ bool EmergeManager::enqueueBlockEmergeEx(
if (entry_already_exists)
return true;

thread = getOptimalThread();
thread->pushBlock(blockpos);
}
s16 coff = -5 / 2;
v3s16 chunk_offset(coff, coff, coff);
v3s16 chunk_pos = getContainerPos(blockpos-chunk_offset, 5);
u32 selected_queue =
(((u32)chunk_pos.X & 1) << 0) |
(((u32)chunk_pos.Y & 1) << 1) |
(((u32)chunk_pos.Z & 1) << 2);

m_queues[selected_queue].push(blockpos);

if(m_turn_waiting == 0 && m_turn_pending == m_threads.size()) {
m_current_queue = selected_queue;
m_turn_waiting = 1;

thread->signal();
for (u32 i = 0; i != m_threads.size(); i++) {
m_threads[i]->m_queue_event.signal();
}
}
}

return true;
}
Expand Down Expand Up @@ -397,49 +418,93 @@ bool EmergeManager::pushBlockEmergeData(
}


bool EmergeManager::popBlockEmergeData(v3s16 pos, BlockEmergeData *bedata)
bool EmergeManager::popBlockEmergeData(BlockEmergeData *out_bedata, v3s16 *out_pos)
{
auto it = m_blocks_enqueued.find(pos);
if (it == m_blocks_enqueued.end())
return false;
while(true) {
auto th = dynamic_cast<EmergeThread *>(Thread::getCurrentThread());

*bedata = it->second;
if(!m_request_stop)
{
MutexAutoLock queuelock(m_queue_mutex);

auto it2 = m_peer_queue_count.find(bedata->peer_requested);
if (it2 == m_peer_queue_count.end())
return false;
assert(m_turn_pending < m_threads.size());
m_turn_pending += 1;
assert(m_turn_pending > 0);

u32 &count_peer = it2->second;
if(m_turn_waiting == 0) {
if(m_turn_pending == m_threads.size()) {
do {
m_current_queue = (m_current_queue + 1) % 8;
m_turn_waiting = m_queues[m_current_queue].size();
} while(m_turn_waiting == 0 && !m_blocks_enqueued.empty());

assert(count_peer != 0);
count_peer--;

m_blocks_enqueued.erase(it);
//errorstream << "Continue with queue " << m_current_queue <<
//" with " << m_turn_waiting << " blocks\n";

return true;
}
for (u32 i = 0; i != m_threads.size(); i++) {
m_threads[i]->m_queue_event.signal();
}
}
}
}

while(m_turn_waiting == 0) {
if(m_request_stop) {
return false;
}

EmergeThread *EmergeManager::getOptimalThread()
{
size_t nthreads = m_threads.size();
th->m_queue_event.wait();
//m_queue_pop_mutex.lock();
}

FATAL_ERROR_IF(nthreads == 0, "No emerge threads!");
if(!m_request_stop)
{
MutexAutoLock queuelock(m_queue_mutex);
m_turn_pending -= 1;

size_t index = 0;
size_t nitems_lowest = m_threads[0]->m_block_queue.size();
if(m_turn_waiting > 0) {

for (size_t i = 1; i < nthreads; i++) {
size_t nitems = m_threads[i]->m_block_queue.size();
if (nitems < nitems_lowest) {
index = i;
nitems_lowest = nitems;
}
}
auto queue = &m_queues[m_current_queue];
*out_pos = queue->front();
queue->pop();
m_turn_waiting -= 1;

//if(m_turn_waiting > 0) {
// m_queue_pop_mutex.unlock();
//}

auto it = m_blocks_enqueued.find(*out_pos);
if (it != m_blocks_enqueued.end()) {

*out_bedata = it->second;

auto it2 = m_peer_queue_count.find(out_bedata->peer_requested);
if (it2 != m_peer_queue_count.end()) {

u32 &count_peer = it2->second;

assert(count_peer != 0);
count_peer--;

m_blocks_enqueued.erase(it);

return m_threads[index];
return true;
}
}
}
}
else {

for (u32 i = 0; i != m_threads.size(); i++) {
m_threads[i]->m_queue_event.signal();
}
return false;
}
}
}


void EmergeManager::reportCompletedEmerge(EmergeAction action)
{
assert((size_t)action < ARRLEN(m_completed_emerge_counter));
Expand Down Expand Up @@ -469,29 +534,22 @@ void EmergeThread::signal()
m_queue_event.signal();
}


bool EmergeThread::pushBlock(v3s16 pos)
{
m_block_queue.push(pos);
return true;
}


void EmergeThread::cancelPendingItems()
{
MutexAutoLock queuelock(m_emerge->m_queue_mutex);
assert(0 && "Trap");
//MutexAutoLock queuelock(m_emerge->m_queue_mutex);

while (!m_block_queue.empty()) {
BlockEmergeData bedata;
v3s16 pos;
//while (!m_block_queue.empty()) {
// BlockEmergeData bedata;
// v3s16 pos;

pos = m_block_queue.front();
m_block_queue.pop();
// pos = m_block_queue.front();
// m_block_queue.pop();

m_emerge->popBlockEmergeData(pos, &bedata);
// m_emerge->popBlockEmergeData(pos, &bedata);

runCompletionCallbacks(pos, EMERGE_CANCELLED, bedata.callbacks);
}
// runCompletionCallbacks(pos, EMERGE_CANCELLED, bedata.callbacks);
//}
}


Expand All @@ -511,23 +569,6 @@ void EmergeThread::runCompletionCallbacks(v3s16 pos, EmergeAction action,
}
}


bool EmergeThread::popBlockEmerge(v3s16 *pos, BlockEmergeData *bedata)
{
MutexAutoLock queuelock(m_emerge->m_queue_mutex);

if (m_block_queue.empty())
return false;

*pos = m_block_queue.front();
m_block_queue.pop();

m_emerge->popBlockEmergeData(*pos, bedata);

return true;
}


EmergeAction EmergeThread::getBlockOrStartGen(const v3s16 pos, bool allow_gen,
const std::string *from_db, MapBlock **block, BlockMakeData *bmdata)
{
Expand Down Expand Up @@ -678,19 +719,15 @@ void *EmergeThread::run()
}

try {
while (!stopRequested()) {
BlockEmergeData bedata;
BlockEmergeData bedata;
while (m_emerge->popBlockEmergeData(&bedata, &pos)) {

BlockMakeData bmdata;
EmergeAction action;
MapBlock *block = nullptr;

porting::TriggerMemoryTrim();

if (!popBlockEmerge(&pos, &bedata)) {
m_queue_event.wait();
continue;
}

g_profiler->add(m_name + ": processed [#]", 1);

if (blockpos_over_max_limit(pos))
Expand Down Expand Up @@ -731,6 +768,7 @@ void *EmergeThread::run()
"EmergeThread: Lua on_generated", SPT_AVG);

try {
// TODO!!cosin call on_generated
m_script->on_generated(&bmdata, m_mapgen->blockseed);
} catch (const LuaError &e) {
m_server->setAsyncFatalError(e);
Expand Down
18 changes: 14 additions & 4 deletions src/emerge.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

#include <map>
#include <mutex>
#include <queue>
#include <atomic>
#include "network/networkprotocol.h"
#include "irr_v3d.h"
#include "util/container.h"
Expand Down Expand Up @@ -114,6 +116,7 @@ class EmergeParams {
const SchematicManager *schemmgr);
};


class EmergeManager {
/* The mod API needs unchecked access to allow:
* - using decomgr or oremgr to place decos/ores
Expand Down Expand Up @@ -200,10 +203,20 @@ class EmergeManager {
// The map database
MapDatabaseAccessor *m_db = nullptr;

bool m_request_stop = false;
std::mutex m_queue_mutex;
std::mutex m_queue_pop_mutex;
std::map<v3s16, BlockEmergeData> m_blocks_enqueued;
std::unordered_map<u16, u32> m_peer_queue_count;


std::array<std::queue<v3s16>, 8> m_queues;
std::atomic<int> m_current_queue;

std::atomic<size_t> m_turn_waiting;
std::atomic<size_t> m_turn_pending;


u32 m_qlimit_total;
u32 m_qlimit_diskonly;
u32 m_qlimit_generate;
Expand All @@ -219,9 +232,6 @@ class EmergeManager {
DecorationManager *decomgr;
SchematicManager *schemmgr;

// Requires m_queue_mutex held
EmergeThread *getOptimalThread();

bool pushBlockEmergeData(
v3s16 pos,
u16 peer_requested,
Expand All @@ -230,7 +240,7 @@ class EmergeManager {
void *callback_param,
bool *entry_already_exists);

bool popBlockEmergeData(v3s16 pos, BlockEmergeData *bedata);
bool popBlockEmergeData(BlockEmergeData *bedata, v3s16 *pos);

void reportCompletedEmerge(EmergeAction action);

Expand Down
2 changes: 0 additions & 2 deletions src/emerge_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,6 @@ class EmergeThread : public Thread {
UniqueQueue<v3s16> *m_trans_liquid; //< non-null only when generating a mapblock

Event m_queue_event;
std::queue<v3s16> m_block_queue;

bool initScripting();

bool popBlockEmerge(v3s16 *pos, BlockEmergeData *bedata);
Expand Down
Loading