diff --git a/catkit2/testbed/service_proxy.py b/catkit2/testbed/service_proxy.py index 7da25aef..ef007a55 100644 --- a/catkit2/testbed/service_proxy.py +++ b/catkit2/testbed/service_proxy.py @@ -25,11 +25,26 @@ def __init__(self, testbed, service_id): # The import is here instead of at the top of the file to avoid circular imports. from .testbed_proxy import TestbedProxy # noqa: E402 object.__setattr__(self, '_testbed', TestbedProxy(getattr(super(), 'testbed').host, getattr(super(), 'testbed').port)) + object.__setattr__(self, '_cacao_streams', {}) @property def testbed(self): return self._testbed + def get_cacao_stream(self, name): + try: + from pyMilk.interfacing.isio_shmlib import SHM + + # Load cacao stream if we didn't already open it. + if name not in self._cacao_streams: + shm_name = self.cacao_stream_names[name] + stream = SHM(shm_name) + self._cacao_streams[name] = stream + + return self._cacao_streams[name] + except ImportError as e: + raise RuntimeError("pyMilk install is required for using CACAO streams.") from e + def __getattr__(self, name): '''Get a property, command or data stream. @@ -59,6 +74,9 @@ def cmd(**kwargs): elif name in self.data_stream_names: # Return datastream. return self.get_data_stream(name) + elif name in self.cacao_stream_names: + # Return cacaostream + return self.get_cacao_stream(name) else: raise AttributeError(f"'{self.__class__.__name__}' object has no attribute '{name}'.") @@ -85,6 +103,8 @@ def __setattr__(self, name, value): raise AttributeError('Cannot set a command.') elif name in self.data_stream_names: raise AttributeError('Cannot set a data stream. Did you mean .submit_data()?') + elif name in self.cacao_stream_names: + raise AttributeError('Cannot set a cacao stream. Did you mean .set_data()?') else: super().__setattr__(name, value) diff --git a/catkit_core/CMakeLists.txt b/catkit_core/CMakeLists.txt index af141bdd..b928fa92 100644 --- a/catkit_core/CMakeLists.txt +++ b/catkit_core/CMakeLists.txt @@ -78,6 +78,13 @@ find_package(Protobuf REQUIRED) target_include_directories(catkit_core PUBLIC ${PROTOBUF_INCLUDE_DIR}) target_link_libraries(catkit_core PUBLIC ${PROTOBUF_LIBRARY}) +# Optionally link Milk +if(USE_MILK) + target_include_directories(catkit_core ${MILK_INSTALLDIR}/include) + target_link_directories(catkit_core ${MILK_INSTALLDIR}/lib) + target_link_libraries(catkit_core PUBLIC ImageStreamIO) +endif(USE_MILK) + # Add install files install(TARGETS catkit_core DESTINATION lib) install(DIRECTORY ${CMAKE_CURRENT_LIST_DIR} diff --git a/catkit_core/CacaoStream.h b/catkit_core/CacaoStream.h new file mode 100644 index 00000000..71705777 --- /dev/null +++ b/catkit_core/CacaoStream.h @@ -0,0 +1,16 @@ +#ifndef CACAO_STREAM_H +#define CACAO_STREAM_H + +#ifdef USE_MILK + +#include "ImageStreamIO.h" +using CacaoStream = IMAGE; + +#else + +// Use a placeholder for the CacaoStream struct. +using CacaoStream = void; + +#endif + +#endif // CACAO_STREAM_H diff --git a/catkit_core/Service.cpp b/catkit_core/Service.cpp index e472d251..89f4fdb5 100644 --- a/catkit_core/Service.cpp +++ b/catkit_core/Service.cpp @@ -61,6 +61,11 @@ Service::Service(string service_type, string service_id, int service_port, int t Service::~Service() { +#ifdef USE_MILK + // Close any Cacao streams that were opened. + for (const auto &[name, stream] : m_CacaoStreams) + ImageStreamIO_closeIm(stream.get()); +#endif // USE_MILK } void Service::Run(void (*error_check)()) @@ -361,6 +366,16 @@ std::shared_ptr Service::GetDataStream(const std::string &stream_nam return nullptr; } +std::shared_ptr Service::GetCacaoStream(const std::string &stream_name) const +{ + auto i = m_CacaoStreams.find(stream_name); + + if (i != m_CacaoStreams.end()) + return i->second; + else + return nullptr; +} + json Service::GetConfig() const { return m_Config; @@ -410,6 +425,21 @@ std::shared_ptr Service::MakeDataStream(std::string stream_name, Dat return stream; } +std::shared_ptr Service::UseCacaoStream(std::string stream_name, std::string cacao_fname) +{ +#ifndef USE_MILK + throw std::runtime_error("Milk was not installed in this binary."); +#else + auto stream = std::make_shared(); + ImageStreamIO_openIm(stream.get(), cacao_fname.c_str()); + // TODO: perform error check. + + m_CacaoStreams[stream_name] = stream; + + return stream; +#endif // USE_MILK +} + std::shared_ptr Service::ReuseDataStream(std::string stream_name, std::string stream_id) { LOG_DEBUG("Reusing data stream \"" + stream_name + "\"."); @@ -448,6 +478,11 @@ string Service::HandleGetInfo(const string &data) for (auto& [key, value] : m_DataStreams) (*reply.mutable_datastream_ids())[key] = value->GetStreamId(); +#ifdef USE_MILK + for (auto& [key, value] : m_CacaoStreams) + (*reply.mutable_cacaostream_ids())[key] = std::string(value->md->name); +#endif // USE_MILK + reply.set_heartbeat_stream_id(m_Heartbeat->GetStreamId()); std::string reply_string; diff --git a/catkit_core/Service.h b/catkit_core/Service.h index 8c878f17..78c9409f 100644 --- a/catkit_core/Service.h +++ b/catkit_core/Service.h @@ -12,6 +12,7 @@ #include "Property.h" #include "Command.h" #include "DataStream.h" +#include "CacaoStream.h" #include "LogConsole.h" #include "LogForwarder.h" #include "Server.h" @@ -42,6 +43,7 @@ class Service std::shared_ptr GetProperty(const std::string &property_name) const; std::shared_ptr GetCommand(const std::string &command_name) const; std::shared_ptr GetDataStream(const std::string &stream_name) const; + std::shared_ptr GetCacaoStream(const std::string &stream_name) const; nlohmann::json GetConfig() const; const std::string &GetId() const; @@ -49,6 +51,7 @@ class Service void MakeProperty(std::string property_name, Property::Getter getter, Property::Setter setter = nullptr, DataType dtype = DataType::DT_UNKNOWN); void MakeCommand(std::string command_name, Command::CommandFunction func); std::shared_ptr MakeDataStream(std::string stream_name, DataType type, std::vector dimensions, size_t num_frames_in_buffer); + std::shared_ptr UseCacaoStream(std::string stream_name, std::string cacao_fname); std::shared_ptr ReuseDataStream(std::string stream_name, std::string stream_id); std::shared_ptr GetTestbed(); @@ -94,6 +97,7 @@ class Service std::map> m_Properties; std::map> m_Commands; std::map> m_DataStreams; + std::map> m_CacaoStreams; LogConsole m_LoggerConsole; LogForwarder m_LoggerPublish; diff --git a/catkit_core/ServiceProxy.cpp b/catkit_core/ServiceProxy.cpp index 53629f88..af30e491 100644 --- a/catkit_core/ServiceProxy.cpp +++ b/catkit_core/ServiceProxy.cpp @@ -33,6 +33,7 @@ ServiceProxy::ServiceProxy(std::shared_ptr testbed, std::string se ServiceProxy::~ServiceProxy() { + Disconnect(); } Value ServiceProxy::GetProperty(const std::string &name, void (*error_check)()) @@ -153,6 +154,35 @@ std::shared_ptr ServiceProxy::GetDataStream(const std::string &name, return m_DataStreams[name]; } +std::shared_ptr ServiceProxy::GetCacaoStream(const std::string &name, void (*error_check)()) +{ +#ifndef USE_MILK + throw std::runtime_error("Milk was not installed in this binary."); +#else + // Start the service if it has not already been started. + Start(TIMEOUT_TO_START, error_check); + + // Check if the name is a valid cacao stream name. + if (m_CacaoStreamIds.find(name) == m_CacaoStreamIds.end()) + throw std::runtime_error("This is not a valid cacao stream name."); + + auto stream = m_CacaoStreams.find(name); + + // Check if we already opened this cacao stream. + if (stream == m_CacaoStreams.end()) + { + // Open it now. + auto stream = std::make_shared(); + ImageStreamIO_openIm(stream.get(), name.c_str()); + // TODO: perform error check. + + m_CacaoStreams[name] = stream; + } + + return m_CacaoStreams[name]; +#endif // USE_MILK +} + std::shared_ptr ServiceProxy::GetHeartbeat() { return m_Heartbeat; @@ -312,6 +342,9 @@ void ServiceProxy::Connect() for (auto& [key, value] : reply.datastream_ids()) m_DataStreamIds[key] = value; + for (auto& [key, value] : reply.cacaostream_ids()) + m_CacaoStreamIds[key] = value; + for (auto& [key, value] : reply.property_datastream_links()) m_PropertyDataStreamLinks[key] = value; @@ -327,8 +360,16 @@ void ServiceProxy::Disconnect() m_PropertyNames.clear(); m_CommandNames.clear(); m_DataStreamIds.clear(); + m_CacaoStreamIds.clear(); m_DataStreams.clear(); +#ifdef USE_MILK + // Close any Cacao streams that were opened. + for (const auto &[name, stream] : m_CacaoStreams) + ImageStreamIO_closeIm(stream.get()); +#endif // USE_MILK + m_CacaoStreams.clear(); + m_Heartbeat = nullptr; } @@ -361,6 +402,19 @@ std::vector ServiceProxy::GetDataStreamNames(void (*error_check)()) return names; } +std::vector ServiceProxy::GetCacaoStreamNames(void (*error_check)()) +{ + // Start the service if it has not already been started. + Start(TIMEOUT_TO_START, error_check); + + std::vector names; + + for (auto const &item : m_CacaoStreamIds) + names.push_back(item.first); + + return names; +} + nlohmann::json ServiceProxy::GetConfig() { return m_Testbed->GetConfig()["services"][m_ServiceId]; diff --git a/catkit_core/ServiceProxy.h b/catkit_core/ServiceProxy.h index 34f77f3b..0f7a5688 100644 --- a/catkit_core/ServiceProxy.h +++ b/catkit_core/ServiceProxy.h @@ -3,6 +3,7 @@ #include "Types.h" #include "DataStream.h" +#include "CacaoStream.h" #include "ServiceState.h" #include "Client.h" @@ -26,6 +27,8 @@ class ServiceProxy std::shared_ptr GetDataStream(const std::string &name, void (*error_check)() = nullptr); + std::shared_ptr GetCacaoStream(const std::string &name, void (*error_check)() = nullptr); + std::shared_ptr GetHeartbeat(); ServiceState GetState(); @@ -40,6 +43,7 @@ class ServiceProxy std::vector GetPropertyNames(void (*error_check)() = nullptr); std::vector GetCommandNames(void (*error_check)() = nullptr); std::vector GetDataStreamNames(void (*error_check)() = nullptr); + std::vector GetCacaoStreamNames(void (*error_check)() = nullptr); nlohmann::json GetConfig(); std::string GetId(); @@ -57,10 +61,12 @@ class ServiceProxy std::vector m_PropertyNames; std::vector m_CommandNames; std::map m_DataStreamIds; + std::map m_CacaoStreamIds; std::map m_PropertyDataStreamLinks; std::map> m_DataStreams; + std::map> m_CacaoStreams; std::shared_ptr m_Heartbeat; std::shared_ptr m_State; diff --git a/proto/service.proto b/proto/service.proto index c6e0a051..6bdbb6dc 100644 --- a/proto/service.proto +++ b/proto/service.proto @@ -17,6 +17,7 @@ message GetInfoReply repeated string property_names = 4; repeated string command_names = 5; map datastream_ids = 6; + map cacaostream_ids = 9; map property_datastream_links = 8;