Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add CACAO streams alongside data streams. #264

Draft
wants to merge 5 commits into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions catkit2/testbed/service_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,26 @@ def __init__(self, testbed, service_id):
# The import is here instead of at the top of the file to avoid circular imports.
from .testbed_proxy import TestbedProxy # noqa: E402
object.__setattr__(self, '_testbed', TestbedProxy(getattr(super(), 'testbed').host, getattr(super(), 'testbed').port))
object.__setattr__(self, '_cacao_streams', {})

@property
def testbed(self):
return self._testbed

def get_cacao_stream(self, name):
try:
from pyMilk.interfacing.isio_shmlib import SHM

# Load cacao stream if we didn't already open it.
if name not in self._cacao_streams:
shm_name = self.cacao_stream_names[name]
stream = SHM(shm_name)
self._cacao_streams[name] = stream

return self._cacao_streams[name]
except ImportError as e:
raise RuntimeError("pyMilk install is required for using CACAO streams.") from e

def __getattr__(self, name):
'''Get a property, command or data stream.

Expand Down Expand Up @@ -59,6 +74,9 @@ def cmd(**kwargs):
elif name in self.data_stream_names:
# Return datastream.
return self.get_data_stream(name)
elif name in self.cacao_stream_names:
# Return cacaostream
return self.get_cacao_stream(name)
else:
raise AttributeError(f"'{self.__class__.__name__}' object has no attribute '{name}'.")

Expand All @@ -85,6 +103,8 @@ def __setattr__(self, name, value):
raise AttributeError('Cannot set a command.')
elif name in self.data_stream_names:
raise AttributeError('Cannot set a data stream. Did you mean .submit_data()?')
elif name in self.cacao_stream_names:
raise AttributeError('Cannot set a cacao stream. Did you mean .set_data()?')
else:
super().__setattr__(name, value)

Expand Down
7 changes: 7 additions & 0 deletions catkit_core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,13 @@ find_package(Protobuf REQUIRED)
target_include_directories(catkit_core PUBLIC ${PROTOBUF_INCLUDE_DIR})
target_link_libraries(catkit_core PUBLIC ${PROTOBUF_LIBRARY})

# Optionally link Milk
if(USE_MILK)
target_include_directories(catkit_core ${MILK_INSTALLDIR}/include)
target_link_directories(catkit_core ${MILK_INSTALLDIR}/lib)
target_link_libraries(catkit_core PUBLIC ImageStreamIO)
endif(USE_MILK)

# Add install files
install(TARGETS catkit_core DESTINATION lib)
install(DIRECTORY ${CMAKE_CURRENT_LIST_DIR}
Expand Down
16 changes: 16 additions & 0 deletions catkit_core/CacaoStream.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
#ifndef CACAO_STREAM_H
#define CACAO_STREAM_H

#ifdef USE_MILK

#include "ImageStreamIO.h"
using CacaoStream = IMAGE;

#else

// Use a placeholder for the CacaoStream struct.
using CacaoStream = void;

#endif

#endif // CACAO_STREAM_H
35 changes: 35 additions & 0 deletions catkit_core/Service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@ Service::Service(string service_type, string service_id, int service_port, int t

Service::~Service()
{
#ifdef USE_MILK
// Close any Cacao streams that were opened.
for (const auto &[name, stream] : m_CacaoStreams)
ImageStreamIO_closeIm(stream.get());
#endif // USE_MILK
}

void Service::Run(void (*error_check)())
Expand Down Expand Up @@ -361,6 +366,16 @@ std::shared_ptr<DataStream> Service::GetDataStream(const std::string &stream_nam
return nullptr;
}

std::shared_ptr<CacaoStream> Service::GetCacaoStream(const std::string &stream_name) const
{
auto i = m_CacaoStreams.find(stream_name);

if (i != m_CacaoStreams.end())
return i->second;
else
return nullptr;
}

json Service::GetConfig() const
{
return m_Config;
Expand Down Expand Up @@ -410,6 +425,21 @@ std::shared_ptr<DataStream> Service::MakeDataStream(std::string stream_name, Dat
return stream;
}

std::shared_ptr<CacaoStream> Service::UseCacaoStream(std::string stream_name, std::string cacao_fname)
{
#ifndef USE_MILK
throw std::runtime_error("Milk was not installed in this binary.");
#else
auto stream = std::make_shared<CacaoStream>();
ImageStreamIO_openIm(stream.get(), cacao_fname.c_str());
// TODO: perform error check.

m_CacaoStreams[stream_name] = stream;

return stream;
#endif // USE_MILK
}

std::shared_ptr<DataStream> Service::ReuseDataStream(std::string stream_name, std::string stream_id)
{
LOG_DEBUG("Reusing data stream \"" + stream_name + "\".");
Expand Down Expand Up @@ -448,6 +478,11 @@ string Service::HandleGetInfo(const string &data)
for (auto& [key, value] : m_DataStreams)
(*reply.mutable_datastream_ids())[key] = value->GetStreamId();

#ifdef USE_MILK
for (auto& [key, value] : m_CacaoStreams)
(*reply.mutable_cacaostream_ids())[key] = std::string(value->md->name);
#endif // USE_MILK

reply.set_heartbeat_stream_id(m_Heartbeat->GetStreamId());

std::string reply_string;
Expand Down
4 changes: 4 additions & 0 deletions catkit_core/Service.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include "Property.h"
#include "Command.h"
#include "DataStream.h"
#include "CacaoStream.h"
#include "LogConsole.h"
#include "LogForwarder.h"
#include "Server.h"
Expand Down Expand Up @@ -42,13 +43,15 @@ class Service
std::shared_ptr<Property> GetProperty(const std::string &property_name) const;
std::shared_ptr<Command> GetCommand(const std::string &command_name) const;
std::shared_ptr<DataStream> GetDataStream(const std::string &stream_name) const;
std::shared_ptr<CacaoStream> GetCacaoStream(const std::string &stream_name) const;

nlohmann::json GetConfig() const;
const std::string &GetId() const;

void MakeProperty(std::string property_name, Property::Getter getter, Property::Setter setter = nullptr, DataType dtype = DataType::DT_UNKNOWN);
void MakeCommand(std::string command_name, Command::CommandFunction func);
std::shared_ptr<DataStream> MakeDataStream(std::string stream_name, DataType type, std::vector<size_t> dimensions, size_t num_frames_in_buffer);
std::shared_ptr<CacaoStream> UseCacaoStream(std::string stream_name, std::string cacao_fname);
std::shared_ptr<DataStream> ReuseDataStream(std::string stream_name, std::string stream_id);

std::shared_ptr<TestbedProxy> GetTestbed();
Expand Down Expand Up @@ -94,6 +97,7 @@ class Service
std::map<std::string, std::shared_ptr<Property>> m_Properties;
std::map<std::string, std::shared_ptr<Command>> m_Commands;
std::map<std::string, std::shared_ptr<DataStream>> m_DataStreams;
std::map<std::string, std::shared_ptr<CacaoStream>> m_CacaoStreams;

LogConsole m_LoggerConsole;
LogForwarder m_LoggerPublish;
Expand Down
54 changes: 54 additions & 0 deletions catkit_core/ServiceProxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ ServiceProxy::ServiceProxy(std::shared_ptr<TestbedProxy> testbed, std::string se

ServiceProxy::~ServiceProxy()
{
Disconnect();
}

Value ServiceProxy::GetProperty(const std::string &name, void (*error_check)())
Expand Down Expand Up @@ -153,6 +154,35 @@ std::shared_ptr<DataStream> ServiceProxy::GetDataStream(const std::string &name,
return m_DataStreams[name];
}

std::shared_ptr<CacaoStream> ServiceProxy::GetCacaoStream(const std::string &name, void (*error_check)())
{
#ifndef USE_MILK
throw std::runtime_error("Milk was not installed in this binary.");
#else
// Start the service if it has not already been started.
Start(TIMEOUT_TO_START, error_check);

// Check if the name is a valid cacao stream name.
if (m_CacaoStreamIds.find(name) == m_CacaoStreamIds.end())
throw std::runtime_error("This is not a valid cacao stream name.");

auto stream = m_CacaoStreams.find(name);

// Check if we already opened this cacao stream.
if (stream == m_CacaoStreams.end())
{
// Open it now.
auto stream = std::make_shared<CacaoStream>();
ImageStreamIO_openIm(stream.get(), name.c_str());
// TODO: perform error check.

m_CacaoStreams[name] = stream;
}

return m_CacaoStreams[name];
#endif // USE_MILK
}

std::shared_ptr<DataStream> ServiceProxy::GetHeartbeat()
{
return m_Heartbeat;
Expand Down Expand Up @@ -312,6 +342,9 @@ void ServiceProxy::Connect()
for (auto& [key, value] : reply.datastream_ids())
m_DataStreamIds[key] = value;

for (auto& [key, value] : reply.cacaostream_ids())
m_CacaoStreamIds[key] = value;

for (auto& [key, value] : reply.property_datastream_links())
m_PropertyDataStreamLinks[key] = value;

Expand All @@ -327,8 +360,16 @@ void ServiceProxy::Disconnect()
m_PropertyNames.clear();
m_CommandNames.clear();
m_DataStreamIds.clear();
m_CacaoStreamIds.clear();
m_DataStreams.clear();

#ifdef USE_MILK
// Close any Cacao streams that were opened.
for (const auto &[name, stream] : m_CacaoStreams)
ImageStreamIO_closeIm(stream.get());
#endif // USE_MILK
m_CacaoStreams.clear();

m_Heartbeat = nullptr;
}

Expand Down Expand Up @@ -361,6 +402,19 @@ std::vector<std::string> ServiceProxy::GetDataStreamNames(void (*error_check)())
return names;
}

std::vector<std::string> ServiceProxy::GetCacaoStreamNames(void (*error_check)())
{
// Start the service if it has not already been started.
Start(TIMEOUT_TO_START, error_check);

std::vector<std::string> names;

for (auto const &item : m_CacaoStreamIds)
names.push_back(item.first);

return names;
}

nlohmann::json ServiceProxy::GetConfig()
{
return m_Testbed->GetConfig()["services"][m_ServiceId];
Expand Down
6 changes: 6 additions & 0 deletions catkit_core/ServiceProxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

#include "Types.h"
#include "DataStream.h"
#include "CacaoStream.h"
#include "ServiceState.h"
#include "Client.h"

Expand All @@ -26,6 +27,8 @@ class ServiceProxy

std::shared_ptr<DataStream> GetDataStream(const std::string &name, void (*error_check)() = nullptr);

std::shared_ptr<CacaoStream> GetCacaoStream(const std::string &name, void (*error_check)() = nullptr);

std::shared_ptr<DataStream> GetHeartbeat();

ServiceState GetState();
Expand All @@ -40,6 +43,7 @@ class ServiceProxy
std::vector<std::string> GetPropertyNames(void (*error_check)() = nullptr);
std::vector<std::string> GetCommandNames(void (*error_check)() = nullptr);
std::vector<std::string> GetDataStreamNames(void (*error_check)() = nullptr);
std::vector<std::string> GetCacaoStreamNames(void (*error_check)() = nullptr);

nlohmann::json GetConfig();
std::string GetId();
Expand All @@ -57,10 +61,12 @@ class ServiceProxy
std::vector<std::string> m_PropertyNames;
std::vector<std::string> m_CommandNames;
std::map<std::string, std::string> m_DataStreamIds;
std::map<std::string, std::string> m_CacaoStreamIds;

std::map<std::string, std::string> m_PropertyDataStreamLinks;

std::map<std::string, std::shared_ptr<DataStream>> m_DataStreams;
std::map<std::string, std::shared_ptr<CacaoStream>> m_CacaoStreams;

std::shared_ptr<DataStream> m_Heartbeat;
std::shared_ptr<DataStream> m_State;
Expand Down
1 change: 1 addition & 0 deletions proto/service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ message GetInfoReply
repeated string property_names = 4;
repeated string command_names = 5;
map<string, string> datastream_ids = 6;
map<string, string> cacaostream_ids = 9;

map<string, string> property_datastream_links = 8;

Expand Down
Loading