Skip to content

Commit

Permalink
DataStorm minor fixes (#3280)
Browse files Browse the repository at this point in the history
  • Loading branch information
pepone authored Dec 17, 2024
1 parent 3b28f7a commit 05251c8
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 26 deletions.
7 changes: 4 additions & 3 deletions cpp/include/DataStorm/DataStorm.h
Original file line number Diff line number Diff line change
Expand Up @@ -1842,7 +1842,7 @@ namespace DataStorm
return [](const std::string& criteria)
{
std::regex expr(criteria);
return [expr](const Value& value)
return [expr = std::move(expr)](const Value& value)
{
std::ostringstream os;
os << value;
Expand Down Expand Up @@ -1888,8 +1888,9 @@ namespace DataStorm
_topicFactory(node._factory),
_keyFactory(DataStormI::KeyFactoryT<Key>::createFactory()),
_tagFactory(DataStormI::TagFactoryT<UpdateTag>::createFactory()),
_keyFilterFactories(DataStormI::FilterManagerT<DataStormI::KeyT<Key>>::create()),
_sampleFilterFactories(DataStormI::FilterManagerT<DataStormI::SampleT<Key, Value, UpdateTag>>::create())
_keyFilterFactories(std::make_shared<DataStormI::FilterManagerT<DataStormI::KeyT<Key>>>()),
_sampleFilterFactories(
std::make_shared<DataStormI::FilterManagerT<DataStormI::SampleT<Key, Value, UpdateTag>>>())
{
RegexFilter<Key, Key>::add(_keyFilterFactories);
RegexFilter<Sample<Key, Value, UpdateTag>, Value>::add(_sampleFilterFactories);
Expand Down
27 changes: 13 additions & 14 deletions cpp/include/DataStorm/InternalT.h
Original file line number Diff line number Diff line change
Expand Up @@ -136,12 +136,13 @@ namespace DataStormI
template<typename K, typename V>
class AbstractFactoryT : public std::enable_shared_from_this<AbstractFactoryT<K, V>>
{
/// A custom deleter to remove the element from the factory when the shared_ptr is deleted.
/// The deleter is used by elements created by the factory.
struct Deleter
{
void operator()(V* obj)
{
auto factory = _factory.lock();
if (factory)
if (auto factory = _factory.lock())
{
factory->remove(obj);
}
Expand All @@ -155,7 +156,10 @@ namespace DataStormI
public:
AbstractFactoryT() : _nextId(1) {}

void init() { _deleter = {std::enable_shared_from_this<AbstractFactoryT<K, V>>::shared_from_this()}; }
void init()
{
_deleter = Deleter{._factory = std::enable_shared_from_this<AbstractFactoryT<K, V>>::shared_from_this()};
}

template<typename F, typename... Args>
std::shared_ptr<typename V::BaseClassType> create(F&& value, Args&&... args)
Expand Down Expand Up @@ -184,17 +188,15 @@ namespace DataStormI
auto p = _elementsById.find(id);
if (p != _elementsById.end())
{
auto k = p->second.lock();
if (k)
{
return k;
}
return p->second.lock();
}
return nullptr;
}

template<typename F, typename... Args> std::shared_ptr<V> createImpl(F&& value, Args&&... args)
{
// Called with _mutex locked

auto p = _elements.find(value);
if (p != _elements.end())
{
Expand Down Expand Up @@ -515,20 +517,18 @@ namespace DataStormI
};

public:
FilterManagerT() {}

template<typename Criteria> std::shared_ptr<Filter> create(const std::string& name, const Criteria& criteria)
{
auto p = _factories.find(name);
if (p == _factories.end())
{
throw std::invalid_argument("unknown filter `" + name + "'");
throw std::invalid_argument("unknown filter '" + name + "'");
}

auto factory = dynamic_cast<FactoryT<Criteria>*>(p->second.get());
if (!factory)
{
throw std::invalid_argument("filter `" + name + "' type doesn't match");
throw std::invalid_argument("filter '" + name + "' type doesn't match");
}

return factory->create(criteria);
Expand Down Expand Up @@ -571,9 +571,8 @@ namespace DataStormI
}
}

static std::shared_ptr<FilterManagerT<ValueT>> create() { return std::make_shared<FilterManagerT<ValueT>>(); }

private:
// A map containing the filter factories, indexed by the filter name.
std::map<std::string, std::unique_ptr<Factory>> _factories;
};
}
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/DataStorm/DataElementI.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1360,12 +1360,12 @@ FilteredDataReaderI::FilteredDataReaderI(
TopicReaderI* topic,
string name,
int64_t id,
const shared_ptr<Filter>& filter,
shared_ptr<Filter> filter,
string sampleFilterName,
Ice::ByteSeq sampleFilterCriteria,
const DataStorm::ReaderConfig& config)
: DataReaderI(topic, std::move(name), id, std::move(sampleFilterName), std::move(sampleFilterCriteria), config),
_filter(filter)
_filter(std::move(filter))
{
if (_traceLevels->data > 0)
{
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/DataStorm/DataElementI.h
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,7 @@ namespace DataStormI
TopicReaderI*,
std::string,
std::int64_t,
const std::shared_ptr<Filter>&,
std::shared_ptr<Filter>,
std::string,
Ice::ByteSeq,
const DataStorm::ReaderConfig&);
Expand Down
23 changes: 17 additions & 6 deletions cpp/src/DataStorm/SessionI.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,8 @@ SessionI::announceTopics(TopicInfoSeq topics, bool, const Ice::Current&)
});
}

// Reap un-visited topics
// Reap dead topics corresponding to subscriptions from a previous session instance ID. Subscribers from the
// previous session instance ID that were not reattached to the new session instance ID are removed.
auto p = _topics.begin();
while (p != _topics.end())
{
Expand Down Expand Up @@ -604,10 +605,19 @@ bool
SessionI::disconnected(const Ice::ConnectionPtr& connection, exception_ptr ex)
{
lock_guard<mutex> lock(_mutex);
if (_destroyed || (connection && _connection != connection) || !_session)
if (_destroyed)
{
// Ignore since we are either already destroyed, or disconnected, or a new connection has already been
// established.
// Ignore already destroyed.
return false;
}
else if (connection && _connection != connection)
{
// Ignore the session has already reconnected using a new connection.
return false;
}
else if (!_session)
{
// Ignore if the session is already disconnected.
return false;
}

Expand All @@ -633,8 +643,9 @@ SessionI::disconnected(const Ice::ConnectionPtr& connection, exception_ptr ex)
}
}

// Detach all topics from the session.
auto self = shared_from_this();
for (auto& t : _topics)
for (const auto& t : _topics)
{
runWithTopics(t.first, [id = t.first, self](TopicI* topic, TopicSubscriber&) { topic->detach(id, self); });
}
Expand Down Expand Up @@ -886,7 +897,7 @@ SessionI::unsubscribe(int64_t id, TopicI* topic)
{
for (auto& [element, elementSubscriber] : elementSubscribers.getSubscribers())
{
for (auto key : elementSubscriber.keys)
for (const auto& key : elementSubscriber.keys)
{
if (elementId > 0)
{
Expand Down

0 comments on commit 05251c8

Please sign in to comment.