Skip to content

Commit

Permalink
Store devices in persistent storage
Browse files Browse the repository at this point in the history
  • Loading branch information
gavv committed Jun 28, 2023
1 parent 5842345 commit 229ff95
Show file tree
Hide file tree
Showing 10 changed files with 256 additions and 31 deletions.
1 change: 1 addition & 0 deletions driver/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ find_library(LIB_Foundation Foundation REQUIRED)
add_library(${DRIVER_NAME} MODULE
device.cpp
device_manager.cpp
device_storage.cpp
driver.cpp
driver_service.cpp
entry_point.cpp
Expand Down
62 changes: 57 additions & 5 deletions driver/device_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,21 @@
#include "device_manager.hpp"

#include <fmt/core.h>
#include <spdlog/spdlog.h>

#include <cassert>
#include <stdexcept>

namespace rocvad {

DeviceManager::DeviceManager(std::shared_ptr<aspl::Plugin> plugin)
DeviceManager::DeviceManager(std::shared_ptr<aspl::Plugin> plugin,
std::shared_ptr<aspl::Storage> storage)
: plugin_(plugin)
, device_storage_(storage)
{
assert(plugin_);

load_devices_();
}

std::vector<DeviceInfo> DeviceManager::get_all_devices()
Expand Down Expand Up @@ -80,6 +85,8 @@ DeviceInfo DeviceManager::add_device(DeviceInfo info)
device_by_index_[info.index] = device;
device_by_uid_[info.uid] = device;

save_devices_();

return info;
}

Expand All @@ -95,6 +102,8 @@ void DeviceManager::delete_device(index_t index)

device_by_index_.erase(info.index);
device_by_uid_.erase(info.uid);

save_devices_();
}

void DeviceManager::delete_device(const std::string& uid)
Expand All @@ -109,15 +118,20 @@ void DeviceManager::delete_device(const std::string& uid)

device_by_index_.erase(info.index);
device_by_uid_.erase(info.uid);

save_devices_();
}

DeviceEndpointInfo DeviceManager::bind_device(index_t index, DeviceEndpointInfo endpoint)
{
std::lock_guard lock(mutex_);

auto device = find_device_(index);
auto info = device->bind(endpoint);

save_devices_();

return device->bind(endpoint);
return info;
}

DeviceEndpointInfo DeviceManager::bind_device(const std::string& uid,
Expand All @@ -126,8 +140,11 @@ DeviceEndpointInfo DeviceManager::bind_device(const std::string& uid,
std::lock_guard lock(mutex_);

auto device = find_device_(uid);
auto info = device->bind(endpoint);

return device->bind(endpoint);
save_devices_();

return info;
}

DeviceEndpointInfo DeviceManager::connect_device(index_t index,
Expand All @@ -136,8 +153,11 @@ DeviceEndpointInfo DeviceManager::connect_device(index_t index,
std::lock_guard lock(mutex_);

auto device = find_device_(index);
auto info = device->connect(endpoint);

save_devices_();

return device->connect(endpoint);
return info;
}

DeviceEndpointInfo DeviceManager::connect_device(const std::string& uid,
Expand All @@ -146,8 +166,11 @@ DeviceEndpointInfo DeviceManager::connect_device(const std::string& uid,
std::lock_guard lock(mutex_);

auto device = find_device_(uid);
auto info = device->connect(endpoint);

save_devices_();

return device->connect(endpoint);
return info;
}

std::shared_ptr<Device> DeviceManager::find_device_(index_t index)
Expand All @@ -174,4 +197,33 @@ std::shared_ptr<Device> DeviceManager::find_device_(const std::string& uid)
return device;
}

void DeviceManager::load_devices_()
{
auto device_list = device_storage_.load_devices();
if (device_list.empty()) {
return;
}

spdlog::info("adding {} device(s) from persistent storage", device_list.size());

for (const auto& device_info : device_list) {
try {
add_device(device_info);
}
catch (std::exception& e) {
spdlog::warn("ignoring invalid device: index={}, uid=\"{}\": {}",
device_info.index,
device_info.uid,
e.what());
}
}
}

void DeviceManager::save_devices_()
{
auto device_list = get_all_devices();

device_storage_.save_devices(device_list);
}

} // namespace rocvad
14 changes: 12 additions & 2 deletions driver/device_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@
#pragma once

#include "device.hpp"
#include "device_storage.hpp"
#include "index_allocator.hpp"
#include "uid_generator.hpp"

#include <aspl/Plugin.hpp>
#include <aspl/Storage.hpp>

#include <map>
#include <memory>
Expand All @@ -29,7 +31,8 @@ class DeviceManager
public:
using index_t = IndexAllocator::index_t;

DeviceManager(std::shared_ptr<aspl::Plugin> plugin);
DeviceManager(std::shared_ptr<aspl::Plugin> plugin,
std::shared_ptr<aspl::Storage> storage);

DeviceManager(const DeviceManager&) = delete;
DeviceManager& operator=(const DeviceManager&) = delete;
Expand All @@ -55,13 +58,20 @@ class DeviceManager
std::shared_ptr<Device> find_device_(index_t index);
std::shared_ptr<Device> find_device_(const std::string& uid);

std::mutex mutex_;
void load_devices_();
void save_devices_();

std::recursive_mutex mutex_;

std::shared_ptr<aspl::Plugin> plugin_;

std::map<uint32_t, std::shared_ptr<Device>> device_by_index_;
std::unordered_map<std::string, std::shared_ptr<Device>> device_by_uid_;

IndexAllocator index_allocator_;
UidGenerator uid_generator_;

DeviceStorage device_storage_;
};

} // namespace rocvad
71 changes: 71 additions & 0 deletions driver/device_storage.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright (c) Roc Streaming authors
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*/

#include "device_storage.hpp"
#include "driver_protocol.hpp"
#include "rpc_serdes.hpp"

#include <spdlog/spdlog.h>

#include <cassert>

namespace rocvad {

DeviceStorage::DeviceStorage(std::shared_ptr<aspl::Storage> storage)
: storage_(storage)
{
assert(storage_);
}

std::vector<DeviceInfo> DeviceStorage::load_devices()
{
auto [bytes, ok] = storage_->ReadBytes("device_list");
if (!ok) {
spdlog::debug("found no device list in persistent storage");
return {};
}

if (bytes.size() == 0) {
spdlog::debug("loaded empty device list from persistent storage");
return {};
}

PrDeviceList msg;
if (!msg.ParseFromArray(bytes.data(), bytes.size())) {
spdlog::warn("ignoring invalid device list from persistent storage");
return {};
}

spdlog::debug("loaded device list from persistent storage");

std::vector<DeviceInfo> devices;
device_list_from_rpc(devices, msg);

return devices;
}

void DeviceStorage::save_devices(const std::vector<DeviceInfo>& devices)
{
PrDeviceList msg;
device_list_to_rpc(msg, devices);

std::vector<UInt8> bytes(msg.ByteSizeLong());

if (bytes.size() != 0) {
msg.SerializeToArray(bytes.data(), bytes.size());
}

if (!storage_->WriteBytes("device_list", bytes)) {
spdlog::warn("failed to save device list to persistent storage");
return;
}

spdlog::debug("saved updated device list to persistent storage");
}

} // namespace rocvad
37 changes: 37 additions & 0 deletions driver/device_storage.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright (c) Roc Streaming authors
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*/

#pragma once

#include "device_defs.hpp"

#include <aspl/Storage.hpp>

#include <memory>
#include <vector>

namespace rocvad {

// Persistent storage for device info.
class DeviceStorage
{
public:
DeviceStorage(std::shared_ptr<aspl::Storage> storage);

DeviceStorage(const DeviceStorage&) = delete;
DeviceStorage& operator=(const DeviceStorage&) = delete;

std::vector<DeviceInfo> load_devices();

void save_devices(const std::vector<DeviceInfo>& devices);

private:
std::shared_ptr<aspl::Storage> storage_;
};

} // namespace rocvad
65 changes: 45 additions & 20 deletions driver/driver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,40 +14,44 @@

namespace rocvad {

Driver::Driver()
{
log_manager_ = std::make_shared<LogManager>();

spdlog::info("received initialization request");
namespace {

aspl::PluginParameters pluginParams;
pluginParams.Manufacturer = "roc-streaming.org";
aspl::PluginParameters make_plugin_params()
{
aspl::PluginParameters plugin_params;

auto context = std::make_shared<aspl::Context>(std::make_shared<Tracer>());
plugin_params.Manufacturer = "roc-streaming.org";

plugin_ = std::make_shared<aspl::Plugin>(context, pluginParams);
driver_ = std::make_shared<aspl::Driver>(context, plugin_);
return plugin_params;
}

device_manager_ = std::make_shared<DeviceManager>(plugin_);
driver_service_ = std::make_unique<DriverService>(log_manager_, device_manager_);
} // namespace

const std::string driver_socket = PlistInfo::driver_socket();
Driver::Driver()
{
// create log manager first to enable logging
log_manager_ = std::make_shared<LogManager>();

spdlog::info("starting rpc server at {}", driver_socket);
spdlog::info("creating driver");

grpc::ServerBuilder rpc_builder;
auto tracer = std::make_shared<Tracer>();
auto context = std::make_shared<aspl::Context>(tracer);

rpc_builder.AddListeningPort(driver_socket, grpc::InsecureServerCredentials());
rpc_builder.RegisterService(driver_service_.get());
plugin_ = std::make_shared<aspl::Plugin>(context, make_plugin_params());
storage_ = std::make_shared<aspl::Storage>(context);
driver_ = std::make_shared<aspl::Driver>(context, plugin_, storage_);

rpc_server_ = rpc_builder.BuildAndStart();
// will invoke OnInitialize() later
driver_->SetDriverHandler(this);
}

Driver::~Driver()
{
spdlog::info("received deinitialization request");
spdlog::info("destroying driver");

rpc_server_->Shutdown();
if (rpc_server_) {
rpc_server_->Shutdown();
}
}

AudioServerPlugInDriverRef Driver::reference()
Expand All @@ -57,4 +61,25 @@ AudioServerPlugInDriverRef Driver::reference()
return driver_->GetReference();
}

OSStatus Driver::OnInitialize()
{
spdlog::info("received initialization request");

device_manager_ = std::make_shared<DeviceManager>(plugin_, storage_);
driver_service_ = std::make_unique<DriverService>(log_manager_, device_manager_);

const std::string driver_socket = PlistInfo::driver_socket();

spdlog::info("starting rpc server at {}", driver_socket);

grpc::ServerBuilder rpc_builder;

rpc_builder.AddListeningPort(driver_socket, grpc::InsecureServerCredentials());
rpc_builder.RegisterService(driver_service_.get());

rpc_server_ = rpc_builder.BuildAndStart();

return kAudioHardwareNoError;
}

} // namespace rocvad
Loading

0 comments on commit 229ff95

Please sign in to comment.