Skip to content

Commit

Permalink
feat: Add host address in scheduler and worker command line argument (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
sitaowang1998 authored Jan 13, 2025
1 parent 235b164 commit b0c84be
Show file tree
Hide file tree
Showing 17 changed files with 88 additions and 146 deletions.
12 changes: 9 additions & 3 deletions docs/quick-start.md
Original file line number Diff line number Diff line change
Expand Up @@ -146,13 +146,15 @@ To start the scheduler, run:
build/spider/src/spider/spider_scheduler \
--storage_url \
"jdbc:mariadb://localhost:3306/spider-storage?user=spider&password=password" \
--host "127.0.0.1" \
--port 6000
```

NOTE:

* If you used a different set of arguments to set up the storage backend, ensure you update the
`storage_url` argument in the command.
* In production, change the host to the real IP address of the machine running the scheduler.
* If the scheduler fails to bind to port `6000`, change the port in the command and try again.

## Setting up a worker
Expand All @@ -169,13 +171,17 @@ To start a worker, run:
build/spider/src/spider/spider_worker \
--storage_url \
"jdbc:mariadb://localhost:3306/spider-storage?user=spider&password=password" \
--port 6000
--host "127.0.0.1" \
--libs "build/libtasks.so"
```

NOTE:

If you used a different set of arguments to set up the storage backend, ensure you update the
`storage_url` argument in the command.
* If you used a different set of arguments to set up the storage backend, ensure you update the
`storage_url` argument in the command.
* In production, change the host to the real IP address of the machine running the worker.
* You can specify multiple task libraries to load. The task libraries must be built with linkage
to the Spider client library.

> [!TIP]
> You can start multiple workers to increase the number of concurrent tasks that can be run on the
Expand Down
14 changes: 2 additions & 12 deletions src/spider/client/Driver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,7 @@ Driver::Driver(std::string const& storage_url) {
throw ConnectionException(err.description);
}

std::optional<std::string> const optional_addr = core::get_address();
if (!optional_addr.has_value()) {
throw ConnectionException("Cannot get machine address");
}
std::string const& addr = optional_addr.value();
err = m_metadata_storage->add_driver(core::Driver{m_id, addr});
err = m_metadata_storage->add_driver(core::Driver{m_id});
if (!err.success()) {
if (core::StorageErrType::DuplicateKeyErr == err.type) {
throw DriverIdInUseException(m_id);
Expand Down Expand Up @@ -72,12 +67,7 @@ Driver::Driver(std::string const& storage_url, boost::uuids::uuid const id) : m_
throw ConnectionException(err.description);
}

std::optional<std::string> const optional_addr = core::get_address();
if (!optional_addr.has_value()) {
throw ConnectionException("Cannot get machine address");
}
std::string const& addr = optional_addr.value();
err = m_metadata_storage->add_driver(core::Driver{m_id, addr});
err = m_metadata_storage->add_driver(core::Driver{m_id});
if (!err.success()) {
if (core::StorageErrType::DuplicateKeyErr == err.type) {
throw DriverIdInUseException(m_id);
Expand Down
5 changes: 1 addition & 4 deletions src/spider/core/Driver.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,12 @@ namespace spider::core {

class Driver {
public:
Driver(boost::uuids::uuid const id, std::string addr) : m_id{id}, m_addr{std::move(addr)} {}
explicit Driver(boost::uuids::uuid const id) : m_id{id} {}

[[nodiscard]] auto get_id() const -> boost::uuids::uuid const& { return m_id; }

[[nodiscard]] auto get_addr() const -> std::string const& { return m_addr; }

private:
boost::uuids::uuid m_id;
std::string m_addr;
};

class Scheduler {
Expand Down
33 changes: 0 additions & 33 deletions src/spider/io/BoostAsio.hpp
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
#ifndef SPIDER_CORE_BOOSTASIO_HPP
#define SPIDER_CORE_BOOSTASIO_HPP

#include <optional>

// clang-format off
// IWYU pragma: begin_exports

Expand Down Expand Up @@ -40,35 +38,4 @@
// IWYU pragma: end_exports
// clang-format on

#include <string>

#include <spdlog/spdlog.h>

namespace spider::core {
inline auto get_address() -> std::optional<std::string> {
try {
boost::asio::io_context io_context;
boost::asio::ip::tcp::resolver resolver(io_context);
auto const endpoints = resolver.resolve(boost::asio::ip::host_name(), "");
for (auto const& endpoint : endpoints) {
if (endpoint.endpoint().address().is_v4()
&& !endpoint.endpoint().address().is_loopback())
{
return endpoint.endpoint().address().to_string();
}
}
// If no non-loopback address found, return loopback address
spdlog::warn("No non-loopback address found, using loopback address");
for (auto const& endpoint : endpoints) {
if (endpoint.endpoint().address().is_v4()) {
return endpoint.endpoint().address().to_string();
}
}
return std::nullopt;
} catch (boost::system::system_error const& e) {
return std::nullopt;
}
}
} // namespace spider::core

#endif // SPIDER_CORE_BOOSTASIO_HPP
18 changes: 11 additions & 7 deletions src/spider/scheduler/scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
#include <cstddef>
#include <functional>
#include <memory>
#include <optional>
#include <string>
#include <system_error>
#include <thread>
Expand Down Expand Up @@ -42,6 +41,11 @@ namespace {
auto parse_args(int const argc, char** argv) -> boost::program_options::variables_map {
boost::program_options::options_description desc;
desc.add_options()("help", "spider scheduler");
desc.add_options()(
"host",
boost::program_options::value<std::string>(),
"scheduler host address"
);
desc.add_options()(
"port",
boost::program_options::value<unsigned short>(),
Expand Down Expand Up @@ -136,13 +140,19 @@ auto main(int argc, char** argv) -> int {
boost::program_options::variables_map const args = parse_args(argc, argv);

unsigned short port = 0;
std::string scheduler_addr;
std::string storage_url;
try {
if (!args.contains("port")) {
spdlog::error("port is required");
return cCmdArgParseErr;
}
port = args["port"].as<unsigned short>();
if (!args.contains("host")) {
spdlog::error("host is required");
return cCmdArgParseErr;
}
scheduler_addr = args["host"].as<std::string>();
if (!args.contains("storage_url")) {
spdlog::error("storage_url is required");
return cCmdArgParseErr;
Expand Down Expand Up @@ -185,12 +195,6 @@ auto main(int argc, char** argv) -> int {
// Get scheduler id and addr
boost::uuids::random_generator gen;
boost::uuids::uuid const scheduler_id = gen();
std::optional<std::string> const optional_scheduler_addr = spider::core::get_address();
if (!optional_scheduler_addr.has_value()) {
spdlog::error("Failed to get scheduler address");
return cSchedulerAddrErr;
}
std::string const& scheduler_addr = optional_scheduler_addr.value();

// Start scheduler server
spider::core::StopToken stop_token;
Expand Down
1 change: 0 additions & 1 deletion src/spider/storage/MetadataStorage.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ class MetadataStorage {

virtual auto add_driver(Driver const& driver) -> StorageErr = 0;
virtual auto add_scheduler(Scheduler const& scheduler) -> StorageErr = 0;
virtual auto get_driver(boost::uuids::uuid id, std::string* addr) -> StorageErr = 0;
virtual auto get_active_scheduler(std::vector<Scheduler>* schedulers) -> StorageErr = 0;

virtual auto
Expand Down
70 changes: 16 additions & 54 deletions src/spider/storage/MysqlStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,13 @@ namespace spider::core {
namespace {
char const* const cCreateDriverTable = R"(CREATE TABLE IF NOT EXISTS `drivers` (
`id` BINARY(16) NOT NULL,
`address` VARCHAR(40) NOT NULL,
`heartbeat` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`id`)
))";

char const* const cCreateSchedulerTable = R"(CREATE TABLE IF NOT EXISTS `schedulers` (
`id` BINARY(16) NOT NULL,
`address` VARCHAR(40) NOT NULL,
`port` INT UNSIGNED NOT NULL,
`state` ENUM('normal', 'recovery', 'gc') NOT NULL,
CONSTRAINT `scheduler_driver_id` FOREIGN KEY (`id`) REFERENCES `drivers` (`id`) ON UPDATE NO ACTION ON DELETE CASCADE,
Expand Down Expand Up @@ -331,11 +331,10 @@ auto get_sql_string(sql::SQLString const& str) -> std::string {
auto MySqlMetadataStorage::add_driver(Driver const& driver) -> StorageErr {
try {
std::unique_ptr<sql::PreparedStatement> statement(
m_conn->prepareStatement("INSERT INTO `drivers` (`id`, `address`) VALUES (?, ?)")
m_conn->prepareStatement("INSERT INTO `drivers` (`id`) VALUES (?)")
);
sql::bytes id_bytes = uuid_get_bytes(driver.get_id());
statement->setBytes(1, &id_bytes);
statement->setString(2, driver.get_addr());
statement->executeUpdate();
} catch (sql::SQLException& e) {
m_conn->rollback();
Expand All @@ -351,17 +350,18 @@ auto MySqlMetadataStorage::add_driver(Driver const& driver) -> StorageErr {
auto MySqlMetadataStorage::add_scheduler(Scheduler const& scheduler) -> StorageErr {
try {
std::unique_ptr<sql::PreparedStatement> driver_statement(
m_conn->prepareStatement("INSERT INTO `drivers` (`id`, `address`) VALUES (?, ?)")
m_conn->prepareStatement("INSERT INTO `drivers` (`id`) VALUES (?)")
);
sql::bytes id_bytes = uuid_get_bytes(scheduler.get_id());
driver_statement->setBytes(1, &id_bytes);
driver_statement->setString(2, scheduler.get_addr());
driver_statement->executeUpdate();
std::unique_ptr<sql::PreparedStatement> scheduler_statement(m_conn->prepareStatement(
"INSERT INTO `schedulers` (`id`, `port`, `state`) VALUES (?, ?, 'normal')"
"INSERT INTO `schedulers` (`id`, `address`, `port`, `state`) "
"VALUES (?, ?, ?, 'normal')"
));
scheduler_statement->setBytes(1, &id_bytes);
scheduler_statement->setInt(2, scheduler.get_port());
scheduler_statement->setString(2, scheduler.get_addr());
scheduler_statement->setInt(3, scheduler.get_port());
scheduler_statement->executeUpdate();
} catch (sql::SQLException& e) {
m_conn->rollback();
Expand All @@ -374,31 +374,6 @@ auto MySqlMetadataStorage::add_scheduler(Scheduler const& scheduler) -> StorageE
return StorageErr{};
}

auto MySqlMetadataStorage::get_driver(boost::uuids::uuid id, std::string* addr) -> StorageErr {
try {
std::unique_ptr<sql::PreparedStatement> statement(
m_conn->prepareStatement("SELECT `address` FROM `drivers` WHERE `id` = ?")
);
sql::bytes id_bytes = uuid_get_bytes(id);
statement->setBytes(1, &id_bytes);
std::unique_ptr<sql::ResultSet> res(statement->executeQuery());
if (0 == res->rowsCount()) {
m_conn->rollback();
return StorageErr{
StorageErrType::KeyNotFoundErr,
fmt::format("no driver with id {}", boost::uuids::to_string(id))
};
}
res->next();
*addr = get_sql_string(res->getString(1));
} catch (sql::SQLException& e) {
m_conn->rollback();
return StorageErr{StorageErrType::OtherErr, e.what()};
}
m_conn->commit();
return StorageErr{};
}

auto MySqlMetadataStorage::get_active_scheduler(std::vector<Scheduler>* schedulers) -> StorageErr {
try {
std::unique_ptr<sql::Statement> statement(m_conn->createStatement());
Expand Down Expand Up @@ -1513,35 +1488,22 @@ auto MySqlMetadataStorage::get_scheduler_state(boost::uuids::uuid id, std::strin
auto MySqlMetadataStorage::get_scheduler_addr(boost::uuids::uuid id, std::string* addr, int* port)
-> StorageErr {
try {
std::unique_ptr<sql::PreparedStatement> addr_statement(
m_conn->prepareStatement("SELECT `address` FROM `drivers` WHERE `id` = ?")
);
std::unique_ptr<sql::PreparedStatement> statement(m_conn->prepareStatement(
"SELECT `address`, `port` FROM `schedulers` WHERE `id` = ?"
));
sql::bytes id_bytes = uuid_get_bytes(id);
addr_statement->setBytes(1, &id_bytes);
std::unique_ptr<sql::ResultSet> addr_res{addr_statement->executeQuery()};
if (addr_res->rowsCount() == 0) {
m_conn->rollback();
return StorageErr{
StorageErrType::KeyNotFoundErr,
fmt::format("no driver with id {}", boost::uuids::to_string(id))
};
}
std::unique_ptr<sql::PreparedStatement> port_statement(
m_conn->prepareStatement("SELECT `port` FROM `schedulers` WHERE `id` = ?")
);
port_statement->setBytes(1, &id_bytes);
std::unique_ptr<sql::ResultSet> port_res{port_statement->executeQuery()};
if (port_res->rowsCount() == 0) {
statement->setBytes(1, &id_bytes);
std::unique_ptr<sql::ResultSet> res{statement->executeQuery()};
if (res->rowsCount() == 0) {
m_conn->rollback();
return StorageErr{
StorageErrType::KeyNotFoundErr,
fmt::format("no scheduler with id {}", boost::uuids::to_string(id))
};
}
addr_res->next();
*addr = get_sql_string(addr_res->getString(1));
port_res->next();
*port = port_res->getInt(1);
res->next();
*addr = get_sql_string(res->getString(1));
*port = res->getInt(2);
} catch (sql::SQLException& e) {
m_conn->rollback();
return StorageErr{StorageErrType::OtherErr, e.what()};
Expand Down
1 change: 0 additions & 1 deletion src/spider/storage/MysqlStorage.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ class MySqlMetadataStorage : public MetadataStorage {
auto initialize() -> StorageErr override;
auto add_driver(Driver const& driver) -> StorageErr override;
auto add_scheduler(Scheduler const& scheduler) -> StorageErr override;
auto get_driver(boost::uuids::uuid id, std::string* addr) -> StorageErr override;
auto get_active_scheduler(std::vector<Scheduler>* schedulers) -> StorageErr override;
auto
add_job(boost::uuids::uuid job_id, boost::uuids::uuid client_id, TaskGraph const& task_graph
Expand Down
23 changes: 14 additions & 9 deletions src/spider/worker/worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ auto parse_args(int const argc, char** argv) -> boost::program_options::variable
boost::program_options::value<std::vector<std::string>>(),
"dynamic libraries that include the spider tasks"
);
desc.add_options()("host", boost::program_options::value<std::string>(), "worker host address");

boost::program_options::variables_map variables;
boost::program_options::store(
Expand Down Expand Up @@ -332,12 +333,22 @@ auto main(int argc, char** argv) -> int {

std::string storage_url;
std::vector<std::string> libs;
std::string worker_addr;
try {
if (!args.contains("storage_url") || !args.contains("libs")) {
spdlog::error("Error: missing required arguments");
if (!args.contains("storage_url")) {
spdlog::error("Missing storage_url");
return cCmdArgParseErr;
}
storage_url = args["storage_url"].as<std::string>();
if (!args.contains("host")) {
spdlog::error("Missing host");
return cCmdArgParseErr;
}
worker_addr = args["host"].as<std::string>();
if (!args.contains("libs") || args["libs"].empty()) {
spdlog::error("Missing libs");
return cCmdArgParseErr;
}
libs = args["libs"].as<std::vector<std::string>>();
} catch (boost::bad_any_cast const& e) {
spdlog::error("Error: {}", e.what());
Expand All @@ -362,16 +373,10 @@ auto main(int argc, char** argv) -> int {
spdlog::error("Cannot connect to data storage: {}", err.description);
return cStorageConnectionErr;
}
std::optional<std::string> const optional_worker_addr = spider::core::get_address();
if (!optional_worker_addr.has_value()) {
spdlog::error("Failed to get worker address");
return cWorkerAddrErr;
}
std::string const& worker_addr = optional_worker_addr.value();

boost::uuids::random_generator gen;
boost::uuids::uuid const worker_id = gen();
spider::core::Driver driver{worker_id, worker_addr};
spider::core::Driver driver{worker_id};
err = metadata_store->add_driver(driver);
if (!err.success()) {
spdlog::error("Cannot add driver to metadata storage: {}", err.description);
Expand Down
5 changes: 1 addition & 4 deletions tests/integration/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ class TaskGraph:
@dataclass
class Driver:
id: uuid.UUID
addr: str


@dataclass
Expand Down Expand Up @@ -180,9 +179,7 @@ def remove_job(conn, job_id: uuid.UUID):
def add_driver(conn, driver: Driver):
cursor = conn.cursor()

cursor.execute(
"INSERT INTO drivers (id, address) VALUES (%s, %s)", (driver.id.bytes, driver.addr)
)
cursor.execute("INSERT INTO drivers (id) VALUES (%s)", (driver.id.bytes,))

conn.commit()
cursor.close()
Expand Down
Loading

0 comments on commit b0c84be

Please sign in to comment.