From 961bda11fe9d78732bc8d71a4359cc5508fb3c15 Mon Sep 17 00:00:00 2001 From: Emiel Por Date: Thu, 7 Nov 2024 11:50:13 -0800 Subject: [PATCH 1/5] Add CACAO streams to Services. --- catkit_core/Service.cpp | 30 ++++++++++++++++++++++++++++++ catkit_core/Service.h | 4 ++++ 2 files changed, 34 insertions(+) diff --git a/catkit_core/Service.cpp b/catkit_core/Service.cpp index e472d2518..c5dd3cace 100644 --- a/catkit_core/Service.cpp +++ b/catkit_core/Service.cpp @@ -61,6 +61,11 @@ Service::Service(string service_type, string service_id, int service_port, int t Service::~Service() { +#ifdef USE_MILK + // Close any Cacao streams that were opened. + for (const auto &[name, stream] : m_CacaoStreams) + ImageStreamIO_closeIm(stream.get()); +#endif // USE_MILK } void Service::Run(void (*error_check)()) @@ -361,6 +366,16 @@ std::shared_ptr Service::GetDataStream(const std::string &stream_nam return nullptr; } +std::shared_ptr Service::GetCacaoStream(const std::string &stream_name) const +{ + auto i = m_CacaoStreams.find(stream_name); + + if (i != m_CacaoStreams.end()) + return i->second; + else + return nullptr; +} + json Service::GetConfig() const { return m_Config; @@ -410,6 +425,21 @@ std::shared_ptr Service::MakeDataStream(std::string stream_name, Dat return stream; } +std::shared_ptr Service::UseCacaoStream(std::string stream_name, std::string cacao_fname) +{ +#ifndef USE_MILK + throw std::runtime_error("Milk was not installed in this binary."); +#else + auto stream = std::make_shared(); + ImageStreamIO_openIm(stream.get(), cacao_fname.c_str()); + // TODO: perform error check. + + m_CacaoStreams[stream_name] = stream; + + return stream; +#endif // USE_MILK +} + std::shared_ptr Service::ReuseDataStream(std::string stream_name, std::string stream_id) { LOG_DEBUG("Reusing data stream \"" + stream_name + "\"."); diff --git a/catkit_core/Service.h b/catkit_core/Service.h index 8c878f172..78c9409f5 100644 --- a/catkit_core/Service.h +++ b/catkit_core/Service.h @@ -12,6 +12,7 @@ #include "Property.h" #include "Command.h" #include "DataStream.h" +#include "CacaoStream.h" #include "LogConsole.h" #include "LogForwarder.h" #include "Server.h" @@ -42,6 +43,7 @@ class Service std::shared_ptr GetProperty(const std::string &property_name) const; std::shared_ptr GetCommand(const std::string &command_name) const; std::shared_ptr GetDataStream(const std::string &stream_name) const; + std::shared_ptr GetCacaoStream(const std::string &stream_name) const; nlohmann::json GetConfig() const; const std::string &GetId() const; @@ -49,6 +51,7 @@ class Service void MakeProperty(std::string property_name, Property::Getter getter, Property::Setter setter = nullptr, DataType dtype = DataType::DT_UNKNOWN); void MakeCommand(std::string command_name, Command::CommandFunction func); std::shared_ptr MakeDataStream(std::string stream_name, DataType type, std::vector dimensions, size_t num_frames_in_buffer); + std::shared_ptr UseCacaoStream(std::string stream_name, std::string cacao_fname); std::shared_ptr ReuseDataStream(std::string stream_name, std::string stream_id); std::shared_ptr GetTestbed(); @@ -94,6 +97,7 @@ class Service std::map> m_Properties; std::map> m_Commands; std::map> m_DataStreams; + std::map> m_CacaoStreams; LogConsole m_LoggerConsole; LogForwarder m_LoggerPublish; From 2c865be4bc3327683d9989a22dd7ea57ab0bef22 Mon Sep 17 00:00:00 2001 From: Emiel Por Date: Thu, 7 Nov 2024 13:03:28 -0800 Subject: [PATCH 2/5] Update service proxy with the cacao streams. --- catkit2/testbed/service_proxy.py | 5 +++ catkit_core/CacaoStream.h | 16 ++++++++++ catkit_core/Service.cpp | 3 ++ catkit_core/ServiceProxy.cpp | 54 ++++++++++++++++++++++++++++++++ catkit_core/ServiceProxy.h | 6 ++++ proto/service.proto | 1 + 6 files changed, 85 insertions(+) create mode 100644 catkit_core/CacaoStream.h diff --git a/catkit2/testbed/service_proxy.py b/catkit2/testbed/service_proxy.py index 7da25aef9..18119a8af 100644 --- a/catkit2/testbed/service_proxy.py +++ b/catkit2/testbed/service_proxy.py @@ -59,6 +59,9 @@ def cmd(**kwargs): elif name in self.data_stream_names: # Return datastream. return self.get_data_stream(name) + elif name in self.cacao_stream_names: + # Return cacaostream + return self.get_cacao_stream(name) else: raise AttributeError(f"'{self.__class__.__name__}' object has no attribute '{name}'.") @@ -85,6 +88,8 @@ def __setattr__(self, name, value): raise AttributeError('Cannot set a command.') elif name in self.data_stream_names: raise AttributeError('Cannot set a data stream. Did you mean .submit_data()?') + elif name in self.cacao_stream_names: + raise AttributeError('Cannot set a cacao stream. Did you mean .set_data()?') else: super().__setattr__(name, value) diff --git a/catkit_core/CacaoStream.h b/catkit_core/CacaoStream.h new file mode 100644 index 000000000..717057779 --- /dev/null +++ b/catkit_core/CacaoStream.h @@ -0,0 +1,16 @@ +#ifndef CACAO_STREAM_H +#define CACAO_STREAM_H + +#ifdef USE_MILK + +#include "ImageStreamIO.h" +using CacaoStream = IMAGE; + +#else + +// Use a placeholder for the CacaoStream struct. +using CacaoStream = void; + +#endif + +#endif // CACAO_STREAM_H diff --git a/catkit_core/Service.cpp b/catkit_core/Service.cpp index c5dd3cace..6f2216dda 100644 --- a/catkit_core/Service.cpp +++ b/catkit_core/Service.cpp @@ -478,6 +478,9 @@ string Service::HandleGetInfo(const string &data) for (auto& [key, value] : m_DataStreams) (*reply.mutable_datastream_ids())[key] = value->GetStreamId(); + for (auto& [key, value] : m_CacaoStreams) + (*reply.mutable_cacaostream_ids())[key] = std::string(value->md->name); + reply.set_heartbeat_stream_id(m_Heartbeat->GetStreamId()); std::string reply_string; diff --git a/catkit_core/ServiceProxy.cpp b/catkit_core/ServiceProxy.cpp index 53629f88d..f002e3c56 100644 --- a/catkit_core/ServiceProxy.cpp +++ b/catkit_core/ServiceProxy.cpp @@ -33,6 +33,7 @@ ServiceProxy::ServiceProxy(std::shared_ptr testbed, std::string se ServiceProxy::~ServiceProxy() { + Disconnect(); } Value ServiceProxy::GetProperty(const std::string &name, void (*error_check)()) @@ -153,6 +154,35 @@ std::shared_ptr ServiceProxy::GetDataStream(const std::string &name, return m_DataStreams[name]; } +std::shared_ptr ServiceProxy::GetCacaoStream(const std::string &name, void (*error_check)()) +{ +#ifndef USE_MILK + throw std::runtime_error("Milk was not installed in this binary."); +#else + // Start the service if it has not already been started. + Start(TIMEOUT_TO_START, error_check); + + // Check if the name is a valid cacao stream name. + if (m_CacaoStreamIds.find(name) == m_CacaoStreamIds.end()) + throw std::runtime_error("This is not a valid cacao stream name."); + + auto stream = m_CacaoStreams.find(name); + + // Check if we already opened this cacao stream. + if (stream == m_CacaoStreams.end()) + { + // Open it now. + auto stream = std::make_shared(); + ImageStreamIO_openIm(stream.get(), name.c_str()); + // TODO: perform error check. + + m_CacaoStreams[name] = stream; + } + + return m_CacaoStreams[name]; +#endif // USE_MILK +} + std::shared_ptr ServiceProxy::GetHeartbeat() { return m_Heartbeat; @@ -312,6 +342,9 @@ void ServiceProxy::Connect() for (auto& [key, value] : reply.datastream_ids()) m_DataStreamIds[key] = value; + for (auto& [keym, value] : reply.cacaostream_ids()) + m_CacaoStreamIds[key] = value; + for (auto& [key, value] : reply.property_datastream_links()) m_PropertyDataStreamLinks[key] = value; @@ -327,8 +360,16 @@ void ServiceProxy::Disconnect() m_PropertyNames.clear(); m_CommandNames.clear(); m_DataStreamIds.clear(); + m_CacaoStreamIds.clear(); m_DataStreams.clear(); +#ifdef USE_MILK + // Close any Cacao streams that were opened. + for (const auto &[name, stream] : m_CacaoStreams) + ImageStreamIO_closeIm(stream.get()); +#endif // USE_MILK + m_CacaoStreams.clear(); + m_Heartbeat = nullptr; } @@ -361,6 +402,19 @@ std::vector ServiceProxy::GetDataStreamNames(void (*error_check)()) return names; } +std::vector ServiceProxy::GetCacaoStreamNames(void (*error_check())) +{ + // Start the service if it has not already been started. + Start(TIMEOUT_T_START, error_check); + + std::vectorGetConfig()["services"][m_ServiceId]; diff --git a/catkit_core/ServiceProxy.h b/catkit_core/ServiceProxy.h index 34f77f3b3..c98d573e6 100644 --- a/catkit_core/ServiceProxy.h +++ b/catkit_core/ServiceProxy.h @@ -3,6 +3,7 @@ #include "Types.h" #include "DataStream.h" +#include "CacaoStream.h" #include "ServiceState.h" #include "Client.h" @@ -26,6 +27,8 @@ class ServiceProxy std::shared_ptr GetDataStream(const std::string &name, void (*error_check)() = nullptr); + std::shared_ptr GetCacaoStream(const std::string &name, void (*error_check)() = nullptr); + std::shared_ptr GetHeartbeat(); ServiceState GetState(); @@ -40,6 +43,7 @@ class ServiceProxy std::vector GetPropertyNames(void (*error_check)() = nullptr); std::vector GetCommandNames(void (*error_check)() = nullptr); std::vector GetDataStreamNames(void (*error_check)() = nullptr); + std:;vector GetCacaoStreamNames(void (*error_check)() = nullptr); nlohmann::json GetConfig(); std::string GetId(); @@ -57,10 +61,12 @@ class ServiceProxy std::vector m_PropertyNames; std::vector m_CommandNames; std::map m_DataStreamIds; + std::map m_CacaoStreamIds; std::map m_PropertyDataStreamLinks; std::map> m_DataStreams; + std::map> m_CacaoStreams; std::shared_ptr m_Heartbeat; std::shared_ptr m_State; diff --git a/proto/service.proto b/proto/service.proto index c6e0a051d..6bdbb6dcb 100644 --- a/proto/service.proto +++ b/proto/service.proto @@ -17,6 +17,7 @@ message GetInfoReply repeated string property_names = 4; repeated string command_names = 5; map datastream_ids = 6; + map cacaostream_ids = 9; map property_datastream_links = 8; From 34be5c4bd197927379d5d343ab5b2a65bcedcc05 Mon Sep 17 00:00:00 2001 From: Emiel Por Date: Thu, 7 Nov 2024 13:09:48 -0800 Subject: [PATCH 3/5] Fix typos. --- catkit_core/Service.cpp | 2 ++ catkit_core/ServiceProxy.cpp | 10 +++++----- catkit_core/ServiceProxy.h | 2 +- 3 files changed, 8 insertions(+), 6 deletions(-) diff --git a/catkit_core/Service.cpp b/catkit_core/Service.cpp index 6f2216dda..89f4fdb53 100644 --- a/catkit_core/Service.cpp +++ b/catkit_core/Service.cpp @@ -478,8 +478,10 @@ string Service::HandleGetInfo(const string &data) for (auto& [key, value] : m_DataStreams) (*reply.mutable_datastream_ids())[key] = value->GetStreamId(); +#ifdef USE_MILK for (auto& [key, value] : m_CacaoStreams) (*reply.mutable_cacaostream_ids())[key] = std::string(value->md->name); +#endif // USE_MILK reply.set_heartbeat_stream_id(m_Heartbeat->GetStreamId()); diff --git a/catkit_core/ServiceProxy.cpp b/catkit_core/ServiceProxy.cpp index f002e3c56..af30e491b 100644 --- a/catkit_core/ServiceProxy.cpp +++ b/catkit_core/ServiceProxy.cpp @@ -342,7 +342,7 @@ void ServiceProxy::Connect() for (auto& [key, value] : reply.datastream_ids()) m_DataStreamIds[key] = value; - for (auto& [keym, value] : reply.cacaostream_ids()) + for (auto& [key, value] : reply.cacaostream_ids()) m_CacaoStreamIds[key] = value; for (auto& [key, value] : reply.property_datastream_links()) @@ -402,14 +402,14 @@ std::vector ServiceProxy::GetDataStreamNames(void (*error_check)()) return names; } -std::vector ServiceProxy::GetCacaoStreamNames(void (*error_check())) +std::vector ServiceProxy::GetCacaoStreamNames(void (*error_check)()) { // Start the service if it has not already been started. - Start(TIMEOUT_T_START, error_check); + Start(TIMEOUT_TO_START, error_check); - std::vector names; - for auto const &item : m_CacaoStreamIds) + for (auto const &item : m_CacaoStreamIds) names.push_back(item.first); return names; diff --git a/catkit_core/ServiceProxy.h b/catkit_core/ServiceProxy.h index c98d573e6..0f7a56888 100644 --- a/catkit_core/ServiceProxy.h +++ b/catkit_core/ServiceProxy.h @@ -43,7 +43,7 @@ class ServiceProxy std::vector GetPropertyNames(void (*error_check)() = nullptr); std::vector GetCommandNames(void (*error_check)() = nullptr); std::vector GetDataStreamNames(void (*error_check)() = nullptr); - std:;vector GetCacaoStreamNames(void (*error_check)() = nullptr); + std::vector GetCacaoStreamNames(void (*error_check)() = nullptr); nlohmann::json GetConfig(); std::string GetId(); From ae80cfb5a4d2ef9ec2b90d08491865482ca809e3 Mon Sep 17 00:00:00 2001 From: Emiel Por Date: Thu, 7 Nov 2024 13:09:58 -0800 Subject: [PATCH 4/5] Add Milk dependency. --- catkit_core/CMakeLists.txt | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/catkit_core/CMakeLists.txt b/catkit_core/CMakeLists.txt index af141bddb..b928fa924 100644 --- a/catkit_core/CMakeLists.txt +++ b/catkit_core/CMakeLists.txt @@ -78,6 +78,13 @@ find_package(Protobuf REQUIRED) target_include_directories(catkit_core PUBLIC ${PROTOBUF_INCLUDE_DIR}) target_link_libraries(catkit_core PUBLIC ${PROTOBUF_LIBRARY}) +# Optionally link Milk +if(USE_MILK) + target_include_directories(catkit_core ${MILK_INSTALLDIR}/include) + target_link_directories(catkit_core ${MILK_INSTALLDIR}/lib) + target_link_libraries(catkit_core PUBLIC ImageStreamIO) +endif(USE_MILK) + # Add install files install(TARGETS catkit_core DESTINATION lib) install(DIRECTORY ${CMAKE_CURRENT_LIST_DIR} From 7c6d5910781d03cd6bdfdc84ec75c7e3f93dd1a0 Mon Sep 17 00:00:00 2001 From: Emiel Por Date: Tue, 12 Nov 2024 15:06:58 -0800 Subject: [PATCH 5/5] Use pyMilk for reading CACAO streams on Python. --- catkit2/testbed/service_proxy.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/catkit2/testbed/service_proxy.py b/catkit2/testbed/service_proxy.py index 18119a8af..ef007a550 100644 --- a/catkit2/testbed/service_proxy.py +++ b/catkit2/testbed/service_proxy.py @@ -25,11 +25,26 @@ def __init__(self, testbed, service_id): # The import is here instead of at the top of the file to avoid circular imports. from .testbed_proxy import TestbedProxy # noqa: E402 object.__setattr__(self, '_testbed', TestbedProxy(getattr(super(), 'testbed').host, getattr(super(), 'testbed').port)) + object.__setattr__(self, '_cacao_streams', {}) @property def testbed(self): return self._testbed + def get_cacao_stream(self, name): + try: + from pyMilk.interfacing.isio_shmlib import SHM + + # Load cacao stream if we didn't already open it. + if name not in self._cacao_streams: + shm_name = self.cacao_stream_names[name] + stream = SHM(shm_name) + self._cacao_streams[name] = stream + + return self._cacao_streams[name] + except ImportError as e: + raise RuntimeError("pyMilk install is required for using CACAO streams.") from e + def __getattr__(self, name): '''Get a property, command or data stream.