Skip to content
This repository has been archived by the owner on Oct 5, 2024. It is now read-only.

Commit

Permalink
Merge branch 'master' of github.com:hotosm/underpass
Browse files Browse the repository at this point in the history
  • Loading branch information
emi420 committed Jan 23, 2024
2 parents 2817759 + 2ab4ca5 commit 6807ef6
Show file tree
Hide file tree
Showing 6 changed files with 121 additions and 61 deletions.
11 changes: 6 additions & 5 deletions setup/underpass.sql
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,9 @@ CREATE TABLE IF NOT EXISTS public.way_refs (
node_id int8
);

CREATE UNIQUE INDEX nodes_id_idx ON public.nodes (osm_id);
CREATE UNIQUE INDEX ways_poly_id_idx ON public.ways_poly (osm_id);
CREATE UNIQUE INDEX ways_line_id_idx ON public.ways_line(osm_id);
CREATE UNIQUE INDEX nodes_id_idx ON public.nodes (osm_id DESC);
CREATE UNIQUE INDEX ways_poly_id_idx ON public.ways_poly (osm_id DESC);
CREATE UNIQUE INDEX ways_line_id_idx ON public.ways_line(osm_id DESC);
CREATE INDEX way_refs_node_id_idx ON public.way_refs (node_id);
CREATE INDEX way_refs_way_id_idx ON public.way_refs (way_id);

Expand All @@ -113,5 +113,6 @@ CREATE INDEX nodes_timestamp_idx ON public.nodes(timestamp DESC);
CREATE INDEX ways_poly_timestamp_idx ON public.ways_poly(timestamp DESC);
CREATE INDEX ways_line_timestamp_idx ON public.ways_line(timestamp DESC);

CREATE INDEX idx_changesets_hashtags ON changesets USING gin(hashtags);
CREATE INDEX idx_osm_id_status ON validation (osm_id)
CREATE INDEX idx_changesets_hashtags ON public.changesets USING gin(hashtags);
CREATE INDEX idx_osm_id_status ON public.validation (osm_id)

127 changes: 86 additions & 41 deletions src/bootstrap/bootstrap.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@
#include <boost/function.hpp>
#include <boost/dll/import.hpp>
#include <boost/timer/timer.hpp>
#include <boost/asio.hpp>
#include <boost/thread/thread.hpp>
#include <boost/asio/thread_pool.hpp>
#include <mutex>
#include <boost/thread/pthread/shared_mutex.hpp>
#include <string.h>

#include "utils/log.hh"
Expand All @@ -40,12 +45,23 @@ using namespace logger;

namespace bootstrap {

const int PAGE_SIZE = 100;

std::string
allTasksQueries(std::shared_ptr<std::vector<BootstrapTask>> tasks) {
std::string queries = "";
for (auto it = tasks->begin(); it != tasks->end(); ++it) {
queries += it->query ;
}
return queries;
}

void startProcessingWays(const underpassconfig::UnderpassConfig &config) {

std::cout << "Connecting to the database (" << config.underpass_db_url << ") ..." << std::endl;
std::cout << "Connecting to the database ..." << std::endl;
auto db = std::make_shared<Pq>();
if (!db->connect(config.underpass_db_url)) {
log_error("Could not connect to Underpass DB, aborting monitoring thread!");
std::cout << "Could not connect to Underpass DB, aborting bootstrapping thread!" << std::endl;
return;
}

Expand Down Expand Up @@ -76,26 +92,53 @@ void startProcessingWays(const underpassconfig::UnderpassConfig &config) {

for (auto table_it = tables.begin(); table_it != tables.end(); ++table_it) {
std::cout << std::endl << "Counting geometries ... " << std::endl;
int total = queryraw->getWaysCount(*table_it);
long int total = queryraw->getWaysCount(*table_it);
long int count = 0;
int num_chunks = total / PAGE_SIZE;

std::cout << "Total: " << total << std::endl;
if (total > 0) {
int count = 0;
long lastid = 0;
while (count < total) {
int percentage = (count * 100) / total;
std::cout << "Threads: " << config.concurrency << std::endl;
std::cout << "Page size: " << PAGE_SIZE << std::endl;
long lastid = 0;

int concurrentTasks = config.concurrency;
int taskIndex = 0;
std::chrono::steady_clock::time_point begin;
std::chrono::steady_clock::time_point end;

for (int chunkIndex = 1; chunkIndex <= (num_chunks/concurrentTasks); chunkIndex++) {

int percentage = (count * 100) / total;

auto ways = std::make_shared<std::vector<OsmWay>>();
if (!config.norefs) {
ways = queryraw->getWaysFromDB(lastid, config.concurrency * PAGE_SIZE, *table_it);
} else {
ways = queryraw->getWaysFromDBWithoutRefs(lastid, config.concurrency * PAGE_SIZE, *table_it);
}

auto tasks = std::make_shared<std::vector<BootstrapTask>>(concurrentTasks);
boost::asio::thread_pool pool(concurrentTasks);
for (int taskIndex = 0; taskIndex < concurrentTasks; taskIndex++) {
auto taskWays = std::make_shared<std::vector<OsmWay>>();
WayTask wayTask {
std::ref(validator),
std::ref(queryvalidate),
config,
taskIndex,
std::ref(tasks),
std::ref(ways),
};
std::cout << "\r" << "Processing " << *table_it << ": " << count << "/" << total << " (" << percentage << "%)";
auto task = std::make_shared<BootstrapTask>();
WayTask wayTask;
wayTask.plugin = validator;
wayTask.queryvalidate = queryvalidate;
wayTask.queryraw = queryraw;
wayTask.task = task;
wayTask.lastid = lastid;

processWays(wayTask, *table_it, config);
db->query(task->query);
lastid = wayTask.lastid;
count += wayTask.processed;
boost::asio::post(pool, boost::bind(threadBootstrapTask, wayTask));
}

pool.join();

db->query(allTasksQueries(tasks));
lastid = ways->back().id;
for (auto it = tasks->begin(); it != tasks->end(); ++it) {
count += it->processed;
}
}
}
Expand All @@ -105,41 +148,43 @@ void startProcessingWays(const underpassconfig::UnderpassConfig &config) {

// This thread get started for every page of way
void
processWays(WayTask &wayTask, const std::string &tableName, const underpassconfig::UnderpassConfig &config)
threadBootstrapTask(WayTask wayTask)
{
#ifdef TIMING_DEBUG
boost::timer::auto_cpu_timer timer("bootstrap::processWays(wayTask): took %w seconds\n");
boost::timer::auto_cpu_timer timer("bootstrap::threadBootstrapTask(wayTask): took %w seconds\n");
#endif

auto plugin = wayTask.plugin;
auto task = wayTask.task;
auto queryvalidate = wayTask.queryvalidate;
auto queryraw = wayTask.queryraw;
auto lastid = wayTask.lastid;
auto config = wayTask.config;
auto taskIndex = wayTask.taskIndex;
auto tasks = wayTask.tasks;
auto ways = wayTask.ways;

auto ways = std::make_shared<std::vector<OsmWay>>();
if (!config.norefs) {
ways = queryraw->getWaysFromDB(lastid, tableName);
} else {
ways = queryraw->getWaysFromDBWithoutRefs(lastid, tableName);
}
wayTask.processed = ways->size();
if (wayTask.processed > 0) {
// Proccesing ways
for (auto way = ways->begin(); way != ways->end(); ++way) {
auto status = plugin->checkWay(*way, "building");
BootstrapTask task;
int processed = 0;

// Proccesing ways
for (int i = 0; i < PAGE_SIZE; ++i) {
// std::cout << "--> processing way " << i * taskIndex << std::endl;
if (i * taskIndex < ways->size()) {
auto way = ways->at(i * (taskIndex + 1));
// std::cout << "[task " << taskIndex << "] way " << i * (taskIndex + 1) << std::endl;
auto status = plugin->checkWay(way, "building");
for (auto status_it = status->status.begin(); status_it != status->status.end(); ++status_it) {
task->query += queryvalidate->applyChange(*status, *status_it);
task.query += queryvalidate->applyChange(*status, *status_it);
}
// Fill the way_refs table
if (!config.norefs) {
for (auto ref = way->refs.begin(); ref != way->refs.end(); ++ref) {
task->query += "INSERT INTO way_refs (way_id, node_id) VALUES (" + std::to_string(way->id) + "," + std::to_string(*ref) + "); ";
for (auto ref = way.refs.begin(); ref != way.refs.end(); ++ref) {
task.query += "INSERT INTO way_refs (way_id, node_id) VALUES (" + std::to_string(way.id) + "," + std::to_string(*ref) + "); ";
}
}
++processed;
}
wayTask.lastid = ways->back().id;
}
task.processed = processed;
const std::lock_guard<std::mutex> lock(tasks_change_mutex);
(*tasks)[taskIndex] = task;

}

Expand Down
13 changes: 8 additions & 5 deletions src/bootstrap/bootstrap.hh
Original file line number Diff line number Diff line change
Expand Up @@ -33,20 +33,23 @@ namespace bootstrap {
/// \brief Represents a bootstrap task
struct BootstrapTask {
std::string query = "";
int processed = 0;
};

struct WayTask {
std::shared_ptr<Validate> plugin;
std::shared_ptr<QueryValidate> queryvalidate;
std::shared_ptr<QueryRaw> queryraw;
std::shared_ptr<BootstrapTask> task;
int processed = 0;
long lastid = 0;
underpassconfig::UnderpassConfig config;
int taskIndex;
std::shared_ptr<std::vector<BootstrapTask>> tasks;
std::shared_ptr<std::vector<OsmWay>> ways;
};

void startProcessingWays(const underpassconfig::UnderpassConfig &config);

// This thread get started for every page of way
void processWays(WayTask &wayTask, const std::string &tableName, const underpassconfig::UnderpassConfig &config);
void threadBootstrapTask(WayTask wayTask);

static std::mutex tasks_change_mutex;

}
16 changes: 12 additions & 4 deletions src/raw/queryraw.cc
Original file line number Diff line number Diff line change
Expand Up @@ -431,14 +431,18 @@ int QueryRaw::getWaysCount(const std::string &tableName) {
}

std::shared_ptr<std::vector<OsmWay>>
QueryRaw::getWaysFromDB(int lastid, const std::string &tableName) {
QueryRaw::getWaysFromDB(long lastid, int pageSize, const std::string &tableName) {
std::string waysQuery;
if (tableName == QueryRaw::polyTable) {
waysQuery = "SELECT osm_id, refs, ST_AsText(ST_ExteriorRing(geom), 4326)";
} else {
waysQuery = "SELECT osm_id, refs, ST_AsText(geom, 4326)";
}
waysQuery += ", version, tags FROM " + tableName + " where osm_id > " + std::to_string(lastid) + " order by osm_id asc limit 500;";
if (lastid > 0) {
waysQuery += ", version, tags FROM " + tableName + " where osm_id < " + std::to_string(lastid) + " order by osm_id desc limit " + std::to_string(pageSize) + ";";
} else {
waysQuery += ", version, tags FROM " + tableName + " order by osm_id desc limit " + std::to_string(pageSize) + ";";
}

auto ways_result = dbconn->query(waysQuery);
// Fill vector of OsmWay objects
Expand Down Expand Up @@ -473,14 +477,18 @@ QueryRaw::getWaysFromDB(int lastid, const std::string &tableName) {
}

std::shared_ptr<std::vector<OsmWay>>
QueryRaw::getWaysFromDBWithoutRefs(int lastid, const std::string &tableName) {
QueryRaw::getWaysFromDBWithoutRefs(long lastid, int pageSize, const std::string &tableName) {
std::string waysQuery;
if (tableName == QueryRaw::polyTable) {
waysQuery = "SELECT osm_id, ST_AsText(ST_ExteriorRing(geom), 4326)";
} else {
waysQuery = "SELECT osm_id, ST_AsText(geom, 4326)";
}
waysQuery += ", tags FROM " + tableName + " where osm_id > " + std::to_string(lastid) + " order by osm_id asc limit 500;";
if (lastid > 0) {
waysQuery += ", tags FROM " + tableName + " where osm_id < " + std::to_string(lastid) + " order by osm_id desc limit " + std::to_string(pageSize) + ";";
} else {
waysQuery += ", tags FROM " + tableName + " order by osm_id desc limit " + std::to_string(pageSize) + ";";
}

auto ways_result = dbconn->query(waysQuery);
// Fill vector of OsmWay objects
Expand Down
4 changes: 2 additions & 2 deletions src/raw/queryraw.hh
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ class QueryRaw {
// Get ways count
int getWaysCount(const std::string &tableName);
// Get ways by page
std::shared_ptr<std::vector<OsmWay>> getWaysFromDB(int page, const std::string &tableName);
std::shared_ptr<std::vector<OsmWay>> getWaysFromDBWithoutRefs(int lastid, const std::string &tableName);
std::shared_ptr<std::vector<OsmWay>> getWaysFromDB(long lastid, int pageSize, const std::string &tableName);
std::shared_ptr<std::vector<OsmWay>> getWaysFromDBWithoutRefs(long lastid, int pageSize, const std::string &tableName);

};

Expand Down
11 changes: 7 additions & 4 deletions utils/raw-underpass.sql
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
CREATE UNIQUE INDEX nodes_id_idx ON public.nodes (osm_id);
CREATE UNIQUE INDEX ways_poly_id_idx ON public.ways_poly (osm_id);
CREATE UNIQUE INDEX ways_line_id_idx ON public.ways_line(osm_id);

CREATE UNIQUE INDEX nodes_id_idx ON public.nodes (osm_id DESC);
CREATE UNIQUE INDEX ways_poly_id_idx ON public.ways_poly (osm_id DESC);
CREATE UNIQUE INDEX ways_line_id_idx ON public.ways_line(osm_id DESC);
CREATE INDEX way_refs_node_id_idx ON public.way_refs (node_id);
CREATE INDEX way_refs_way_id_idx ON public.way_refs (way_id);

Expand All @@ -12,3 +11,7 @@ CREATE INDEX ways_line_version_idx ON public.ways_line (version);
CREATE INDEX nodes_timestamp_idx ON public.nodes(timestamp DESC);
CREATE INDEX ways_poly_timestamp_idx ON public.ways_poly(timestamp DESC);
CREATE INDEX ways_line_timestamp_idx ON public.ways_line(timestamp DESC);

CREATE INDEX idx_changesets_hashtags ON public.changesets USING gin(hashtags);
CREATE INDEX idx_osm_id_status ON public.validation (osm_id)

0 comments on commit 6807ef6

Please sign in to comment.