diff --git a/driver/CMakeLists.txt b/driver/CMakeLists.txt index 2e76cf1..5698095 100644 --- a/driver/CMakeLists.txt +++ b/driver/CMakeLists.txt @@ -17,7 +17,9 @@ add_library(${DRIVER_NAME} MODULE log_manager.cpp log_sender.cpp log_syslog.cpp + receiver.cpp rpc_serdes.cpp + sender.cpp tracer.cpp uid_generator.cpp ) @@ -28,6 +30,7 @@ target_link_libraries(${DRIVER_NAME} PRIVATE ${RPC_LIB} # third-party libs + roc aspl::libASPL gRPC::grpc++_unsecure spdlog::spdlog diff --git a/driver/device.cpp b/driver/device.cpp index 6683f36..23ee357 100644 --- a/driver/device.cpp +++ b/driver/device.cpp @@ -8,6 +8,8 @@ #include "device.hpp" #include "build_info.hpp" +#include "receiver.hpp" +#include "sender.hpp" #include #include @@ -116,6 +118,7 @@ aspl::StreamParameters make_stream_params(const DeviceInfo& info) } // namespace Device::Device(std::shared_ptr hal_plugin, + std::shared_ptr network_context, IndexAllocator& index_allocator, UidGenerator& uid_generator, const DeviceInfo& device_info) @@ -151,7 +154,21 @@ Device::Device(std::shared_ptr hal_plugin, hal_device_->AddStreamWithControlsAsync(make_stream_params(info_)); - // TODO: roc_open + // we implement both interfaces + hal_device_->SetControlHandler(this); + hal_device_->SetIOHandler(this); + + if (info_.type == DeviceType::Sender) { + network_transceiver_ = std::make_unique( + network_context, info_.uid, info_.device_encoding, *info_.sender_config); + } else { + network_transceiver_ = std::make_unique( + network_context, info_.uid, info_.device_encoding, *info_.receiver_config); + } + + if (!info_.enabled) { + network_transceiver_->pause(); + } sort_endpoints_(); @@ -176,7 +193,7 @@ Device::~Device() toggle(false); - // TODO: roc_close + network_transceiver_.reset(); if (info_.index != 0) { index_allocator_.release(info_.index); @@ -197,6 +214,7 @@ void Device::toggle(bool enabled) spdlog::debug("enabling device {}", info_.uid); hal_plugin_->AddDevice(hal_device_); + network_transceiver_->resume(); } else { spdlog::debug("device {} is already enabled", info_.uid); } @@ -205,6 +223,7 @@ void Device::toggle(bool enabled) spdlog::debug("disabling device {}", info_.uid); hal_plugin_->RemoveDevice(hal_device_); + network_transceiver_->pause(); } else { spdlog::debug("device {} is already disabled", info_.uid); } @@ -240,7 +259,7 @@ void Device::bind_endpoint_(DeviceEndpointInfo& endpoint_info) endpoint_info.slot, endpoint_info.uri); - // TODO: roc_bind + network_transceiver_->bind(endpoint_info); } void Device::connect_endpoint_(DeviceEndpointInfo& endpoint_info) @@ -250,7 +269,7 @@ void Device::connect_endpoint_(DeviceEndpointInfo& endpoint_info) endpoint_info.slot, endpoint_info.uri); - // TODO: roc_connect + network_transceiver_->connect(endpoint_info); } void Device::sort_endpoints_() @@ -268,4 +287,43 @@ void Device::sort_endpoints_() } } +// aspl::ControlRequestHandler +OSStatus Device::OnStartIO() +{ + spdlog::info("starting io on device {}", info_.uid); + + network_transceiver_->resume(); + + return kAudioHardwareNoError; +} + +void Device::OnStopIO() +{ + spdlog::info("stopping io on device {}", info_.uid); + + network_transceiver_->pause(); +} + +// aspl::IORequestHandler +void Device::OnReadClientInput(const std::shared_ptr& client, + const std::shared_ptr& stream, + Float64 zero_timestamp, + Float64 timestamp, + void* bytes, + UInt32 bytes_count) +{ + // TODO: ringbuf + network_transceiver_->read((uint64_t)timestamp, bytes, bytes_count); +} + +void Device::OnWriteMixedOutput(const std::shared_ptr& stream, + Float64 zero_timestamp, + Float64 timestamp, + const void* bytes, + UInt32 bytes_count) +{ + // TODO: ringbuf + network_transceiver_->write((uint64_t)timestamp, bytes, bytes_count); +} + } // namespace rocvad diff --git a/driver/device.hpp b/driver/device.hpp index 4b4c19e..0a77c7b 100644 --- a/driver/device.hpp +++ b/driver/device.hpp @@ -10,8 +10,11 @@ #include "device_defs.hpp" #include "index_allocator.hpp" +#include "transceiver.hpp" #include "uid_generator.hpp" +#include + #include #include @@ -20,10 +23,11 @@ namespace rocvad { // Correspond to one virtual device. -class Device +class Device : private aspl::ControlRequestHandler, private aspl::IORequestHandler { public: Device(std::shared_ptr hal_plugin, + std::shared_ptr network_context, IndexAllocator& index_allocator, UidGenerator& uid_generator, const DeviceInfo& device_info); @@ -40,6 +44,24 @@ class Device DeviceEndpointInfo connect(DeviceEndpointInfo endpoint_info); private: + // aspl::ControlRequestHandler + OSStatus OnStartIO() override; + void OnStopIO() override; + + // aspl::IORequestHandler + void OnReadClientInput(const std::shared_ptr& client, + const std::shared_ptr& stream, + Float64 zero_timestamp, + Float64 timestamp, + void* bytes, + UInt32 bytes_count) override; + void OnWriteMixedOutput(const std::shared_ptr& stream, + Float64 zero_timestamp, + Float64 timestamp, + const void* bytes, + UInt32 bytes_count) override; + + // endpoints void bind_endpoint_(DeviceEndpointInfo& endpoint_info); void connect_endpoint_(DeviceEndpointInfo& endpoint_info); void sort_endpoints_(); @@ -54,6 +76,9 @@ class Device std::shared_ptr hal_plugin_; std::shared_ptr hal_device_; + // network sender or receiver + // which one is used depends on device type + std::unique_ptr network_transceiver_; // run-time device info DeviceInfo info_; diff --git a/driver/device_manager.cpp b/driver/device_manager.cpp index 0d1be28..316a22a 100644 --- a/driver/device_manager.cpp +++ b/driver/device_manager.cpp @@ -16,6 +16,30 @@ namespace rocvad { +namespace { + +std::shared_ptr make_network_context() +{ + roc_context_config config; + memset(&config, 0, sizeof(config)); + + roc_context* context = nullptr; + int err = roc_context_open(&config, &context); + if (err < 0) { + spdlog::critical("can't open network context: err={}", err); + return {}; + } + + return std::shared_ptr(context, [](roc_context* context) { + int err = roc_context_close(context); + if (err < 0) { + spdlog::critical("can't close network context: err={}", err); + } + }); +} + +} // namespace + DeviceManager::DeviceManager(std::shared_ptr hal_plugin, std::shared_ptr hal_storage) : hal_plugin_(hal_plugin) @@ -23,6 +47,8 @@ DeviceManager::DeviceManager(std::shared_ptr hal_plugin, { assert(hal_plugin_); + network_context_ = make_network_context(); + load_devices_(); } @@ -71,8 +97,8 @@ DeviceInfo DeviceManager::add_device(DeviceInfo info) fmt::format("device with uid \"{}\" already exists", info.uid)); } - auto device = - std::make_shared(hal_plugin_, index_allocator_, uid_generator_, info); + auto device = std::make_shared( + hal_plugin_, network_context_, index_allocator_, uid_generator_, info); info = device->info(); diff --git a/driver/device_manager.hpp b/driver/device_manager.hpp index ea30e2b..8aa0eff 100644 --- a/driver/device_manager.hpp +++ b/driver/device_manager.hpp @@ -13,6 +13,8 @@ #include "index_allocator.hpp" #include "uid_generator.hpp" +#include + #include #include @@ -75,6 +77,8 @@ class DeviceManager UidGenerator uid_generator_; DeviceStorage device_storage_; + + std::shared_ptr network_context_; }; } // namespace rocvad diff --git a/driver/receiver.cpp b/driver/receiver.cpp new file mode 100644 index 0000000..909f227 --- /dev/null +++ b/driver/receiver.cpp @@ -0,0 +1,40 @@ +/* + * Copyright (c) Roc Streaming authors + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + */ + +#include "receiver.hpp" + +namespace rocvad { + +Receiver::Receiver(std::shared_ptr network_context, + const std::string& uid, + const DeviceLocalEncoding& device_encoding, + const DeviceReceiverConfig& receiver_config) +{ +} + +void Receiver::bind(DeviceEndpointInfo& endpoint_info) +{ +} + +void Receiver::connect(DeviceEndpointInfo& endpoint_info) +{ +} + +void Receiver::pause() noexcept +{ +} + +void Receiver::resume() noexcept +{ +} + +void Receiver::read(uint64_t timestamp, void* bytes, size_t n_bytes) noexcept +{ +} + +} // namespace rocvad diff --git a/driver/receiver.hpp b/driver/receiver.hpp new file mode 100644 index 0000000..7c80419 --- /dev/null +++ b/driver/receiver.hpp @@ -0,0 +1,41 @@ +/* + * Copyright (c) Roc Streaming authors + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + */ + +#pragma once + +#include "transceiver.hpp" + +#include +#include + +namespace rocvad { + +class Receiver : public Transceiver +{ +public: + Receiver(std::shared_ptr network_context, + const std::string& uid, + const DeviceLocalEncoding& device_encoding, + const DeviceReceiverConfig& receiver_config); + + void bind(DeviceEndpointInfo& endpoint_info) override; + void connect(DeviceEndpointInfo& endpoint_info) override; + + void pause() noexcept override; + void resume() noexcept override; + + void read(uint64_t timestamp, void* bytes, size_t n_bytes) noexcept override; + +private: + std::string uid_; + + std::shared_ptr net_context_; + std::shared_ptr net_receiver_; +}; + +} // namespace rocvad diff --git a/driver/sender.cpp b/driver/sender.cpp new file mode 100644 index 0000000..7621c6f --- /dev/null +++ b/driver/sender.cpp @@ -0,0 +1,40 @@ +/* + * Copyright (c) Roc Streaming authors + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + */ + +#include "sender.hpp" + +namespace rocvad { + +Sender::Sender(std::shared_ptr network_context, + const std::string& uid, + const DeviceLocalEncoding& device_encoding, + const DeviceSenderConfig& sender_config) +{ +} + +void Sender::bind(DeviceEndpointInfo& endpoint_info) +{ +} + +void Sender::connect(DeviceEndpointInfo& endpoint_info) +{ +} + +void Sender::pause() noexcept +{ +} + +void Sender::resume() noexcept +{ +} + +void Sender::write(uint64_t timestamp, const void* bytes, size_t n_bytes) noexcept +{ +} + +} // namespace rocvad diff --git a/driver/sender.hpp b/driver/sender.hpp new file mode 100644 index 0000000..73e701f --- /dev/null +++ b/driver/sender.hpp @@ -0,0 +1,41 @@ +/* + * Copyright (c) Roc Streaming authors + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + */ + +#pragma once + +#include "transceiver.hpp" + +#include +#include + +namespace rocvad { + +class Sender : public Transceiver +{ +public: + Sender(std::shared_ptr network_context, + const std::string& uid, + const DeviceLocalEncoding& device_encoding, + const DeviceSenderConfig& sender_config); + + void bind(DeviceEndpointInfo& endpoint_info) override; + void connect(DeviceEndpointInfo& endpoint_info) override; + + void pause() noexcept override; + void resume() noexcept override; + + void write(uint64_t timestamp, const void* bytes, size_t n_bytes) noexcept override; + +private: + std::string uid_; + + std::shared_ptr net_context_; + std::shared_ptr net_sender_; +}; + +} // namespace rocvad diff --git a/driver/transceiver.hpp b/driver/transceiver.hpp new file mode 100644 index 0000000..8b3c531 --- /dev/null +++ b/driver/transceiver.hpp @@ -0,0 +1,50 @@ +/* + * Copyright (c) Roc Streaming authors + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + */ + +#pragma once + +#include "device_defs.hpp" + +#include +#include +#include + +namespace rocvad { + +// Base class for sender and receiver +class Transceiver +{ +public: + Transceiver() = default; + virtual ~Transceiver() = default; + + Transceiver(const Transceiver&) = delete; + Transceiver& operator=(const Transceiver&) = delete; + + // may modify endpoint_info + // may throw + virtual void bind(DeviceEndpointInfo& endpoint_info) = 0; + virtual void connect(DeviceEndpointInfo& endpoint_info) = 0; + + // called when I/O is stopped / started + virtual void pause() noexcept = 0; + virtual void resume() noexcept = 0; + + // for sender + virtual void write(uint64_t timestamp, const void* bytes, size_t n_bytes) noexcept + { + } + + // for receiver + virtual void read(uint64_t timestamp, void* bytes, size_t n_bytes) noexcept + { + memset(bytes, 0, n_bytes); + } +}; + +} // namespace rocvad