Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DataStorm minor fixes #3280

Merged
merged 3 commits into from
Dec 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions cpp/include/DataStorm/DataStorm.h
Original file line number Diff line number Diff line change
Expand Up @@ -1842,7 +1842,7 @@ namespace DataStorm
return [](const std::string& criteria)
{
std::regex expr(criteria);
return [expr](const Value& value)
return [expr = std::move(expr)](const Value& value)
{
std::ostringstream os;
os << value;
Expand Down Expand Up @@ -1888,8 +1888,9 @@ namespace DataStorm
_topicFactory(node._factory),
_keyFactory(DataStormI::KeyFactoryT<Key>::createFactory()),
_tagFactory(DataStormI::TagFactoryT<UpdateTag>::createFactory()),
_keyFilterFactories(DataStormI::FilterManagerT<DataStormI::KeyT<Key>>::create()),
_sampleFilterFactories(DataStormI::FilterManagerT<DataStormI::SampleT<Key, Value, UpdateTag>>::create())
_keyFilterFactories(std::make_shared<DataStormI::FilterManagerT<DataStormI::KeyT<Key>>>()),
_sampleFilterFactories(
std::make_shared<DataStormI::FilterManagerT<DataStormI::SampleT<Key, Value, UpdateTag>>>())
{
RegexFilter<Key, Key>::add(_keyFilterFactories);
RegexFilter<Sample<Key, Value, UpdateTag>, Value>::add(_sampleFilterFactories);
Expand Down
27 changes: 13 additions & 14 deletions cpp/include/DataStorm/InternalT.h
Original file line number Diff line number Diff line change
Expand Up @@ -136,12 +136,13 @@ namespace DataStormI
template<typename K, typename V>
class AbstractFactoryT : public std::enable_shared_from_this<AbstractFactoryT<K, V>>
{
/// A custom deleter to remove the element from the factory when the shared_ptr is deleted.
/// The deleter is used by elements created by the factory.
struct Deleter
{
void operator()(V* obj)
{
auto factory = _factory.lock();
if (factory)
if (auto factory = _factory.lock())
{
factory->remove(obj);
}
Expand All @@ -155,7 +156,10 @@ namespace DataStormI
public:
AbstractFactoryT() : _nextId(1) {}

void init() { _deleter = {std::enable_shared_from_this<AbstractFactoryT<K, V>>::shared_from_this()}; }
void init()
{
_deleter = Deleter{._factory = std::enable_shared_from_this<AbstractFactoryT<K, V>>::shared_from_this()};
}

template<typename F, typename... Args>
std::shared_ptr<typename V::BaseClassType> create(F&& value, Args&&... args)
Expand Down Expand Up @@ -184,17 +188,15 @@ namespace DataStormI
auto p = _elementsById.find(id);
if (p != _elementsById.end())
{
auto k = p->second.lock();
if (k)
{
return k;
}
return p->second.lock();
}
return nullptr;
}

template<typename F, typename... Args> std::shared_ptr<V> createImpl(F&& value, Args&&... args)
{
// Called with _mutex locked

auto p = _elements.find(value);
if (p != _elements.end())
{
Expand Down Expand Up @@ -515,20 +517,18 @@ namespace DataStormI
};

public:
FilterManagerT() {}

template<typename Criteria> std::shared_ptr<Filter> create(const std::string& name, const Criteria& criteria)
{
auto p = _factories.find(name);
if (p == _factories.end())
{
throw std::invalid_argument("unknown filter `" + name + "'");
throw std::invalid_argument("unknown filter '" + name + "'");
}

auto factory = dynamic_cast<FactoryT<Criteria>*>(p->second.get());
if (!factory)
{
throw std::invalid_argument("filter `" + name + "' type doesn't match");
throw std::invalid_argument("filter '" + name + "' type doesn't match");
}

return factory->create(criteria);
Expand Down Expand Up @@ -571,9 +571,8 @@ namespace DataStormI
}
}

static std::shared_ptr<FilterManagerT<ValueT>> create() { return std::make_shared<FilterManagerT<ValueT>>(); }

private:
// A map containing the filter factories, indexed by the filter name.
std::map<std::string, std::unique_ptr<Factory>> _factories;
};
}
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/DataStorm/DataElementI.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1360,12 +1360,12 @@ FilteredDataReaderI::FilteredDataReaderI(
TopicReaderI* topic,
string name,
int64_t id,
const shared_ptr<Filter>& filter,
shared_ptr<Filter> filter,
string sampleFilterName,
Ice::ByteSeq sampleFilterCriteria,
const DataStorm::ReaderConfig& config)
: DataReaderI(topic, std::move(name), id, std::move(sampleFilterName), std::move(sampleFilterCriteria), config),
_filter(filter)
_filter(std::move(filter))
{
if (_traceLevels->data > 0)
{
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/DataStorm/DataElementI.h
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,7 @@ namespace DataStormI
TopicReaderI*,
std::string,
std::int64_t,
const std::shared_ptr<Filter>&,
std::shared_ptr<Filter>,
std::string,
Ice::ByteSeq,
const DataStorm::ReaderConfig&);
Expand Down
23 changes: 17 additions & 6 deletions cpp/src/DataStorm/SessionI.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,8 @@ SessionI::announceTopics(TopicInfoSeq topics, bool, const Ice::Current&)
});
}

// Reap un-visited topics
// Reap dead topics corresponding to subscriptions from a previous session instance ID. Subscribers from the
// previous session instance ID that were not reattached to the new session instance ID are removed.
auto p = _topics.begin();
while (p != _topics.end())
{
Expand Down Expand Up @@ -604,10 +605,19 @@ bool
SessionI::disconnected(const Ice::ConnectionPtr& connection, exception_ptr ex)
{
lock_guard<mutex> lock(_mutex);
if (_destroyed || (connection && _connection != connection) || !_session)
if (_destroyed)
{
// Ignore since we are either already destroyed, or disconnected, or a new connection has already been
// established.
// Ignore already destroyed.
return false;
}
else if (connection && _connection != connection)
{
// Ignore the session has already reconnected using a new connection.
return false;
}
else if (!_session)
{
// Ignore if the session is already disconnected.
return false;
}

Expand All @@ -633,8 +643,9 @@ SessionI::disconnected(const Ice::ConnectionPtr& connection, exception_ptr ex)
}
}

// Detach all topics from the session.
auto self = shared_from_this();
for (auto& t : _topics)
for (const auto& t : _topics)
{
runWithTopics(t.first, [id = t.first, self](TopicI* topic, TopicSubscriber&) { topic->detach(id, self); });
}
Expand Down Expand Up @@ -886,7 +897,7 @@ SessionI::unsubscribe(int64_t id, TopicI* topic)
{
for (auto& [element, elementSubscriber] : elementSubscribers.getSubscribers())
{
for (auto key : elementSubscriber.keys)
for (const auto& key : elementSubscriber.keys)
{
if (elementId > 0)
{
Expand Down
Loading