Skip to content

Commit

Permalink
Revert
Browse files Browse the repository at this point in the history
  • Loading branch information
frmdstryr committed Feb 2, 2025
1 parent e799ac3 commit 468b0c0
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 27 deletions.
57 changes: 32 additions & 25 deletions atom/src/observerpool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ namespace

struct BaseTask : public ModifyTask
{
BaseTask( ObserverPool* pool, cppy::ptr& topic, cppy::ptr& observer ) :
m_pool( pool ), m_topic( topic ), m_observer( observer ) {}
BaseTask( ObserverPool* pool, PyObject* topic, PyObject* observer ) :
m_pool( pool ), m_topic( cppy::incref( topic ) ), m_observer( cppy::incref( observer ) ) {}
ObserverPool* m_pool;
cppy::ptr m_topic;
cppy::ptr m_observer;
Expand All @@ -28,7 +28,7 @@ struct BaseTask : public ModifyTask

struct AddTask : public BaseTask
{
AddTask( ObserverPool* pool, cppy::ptr& topic, cppy::ptr& observer, uint8_t change_types ) :
AddTask( ObserverPool* pool, PyObject* topic, PyObject* observer, uint8_t change_types ) :
BaseTask( pool, topic, observer ), m_change_types(change_types) {}
void run() { m_pool->add( m_topic, m_observer, m_change_types ); }
uint8_t m_change_types;
Expand All @@ -37,15 +37,15 @@ struct AddTask : public BaseTask

struct RemoveTask : public BaseTask
{
RemoveTask( ObserverPool* pool, cppy::ptr& topic, cppy::ptr& observer ) :
RemoveTask( ObserverPool* pool, PyObject* topic, PyObject* observer ) :
BaseTask( pool, topic, observer ) {}
void run() { m_pool->remove( m_topic, m_observer ); }
};

struct RemoveTopicTask : ModifyTask
{
RemoveTopicTask( ObserverPool* pool, cppy::ptr& topic ) :
m_pool( pool ), m_topic( topic ) {}
RemoveTopicTask( ObserverPool* pool, PyObject* topic ) :
m_pool( pool ), m_topic( cppy::incref( topic ) ) {}
void run() { m_pool->remove( m_topic ); }
ObserverPool* m_pool;
cppy::ptr m_topic;
Expand Down Expand Up @@ -117,15 +117,14 @@ ObserverPool::has_observer( cppy::ptr& topic, cppy::ptr& observer, uint8_t chang
void
ObserverPool::add( cppy::ptr& topic, cppy::ptr& observer, uint8_t change_types )
{
if ( !observer.is_truthy() )
return; // A deferred add on a dead atom ?

if( m_modify_guard )
{
ModifyTask* task = new AddTask( this, topic, observer, change_types );
ModifyTask* task = new AddTask( this, topic.get(), observer.get(), change_types );
m_modify_guard->add_task( task );
return;
}
if ( !observer.is_truthy() )
return; // A deferred add on a dead atom ?
uint32_t obs_offset = 0;
std::vector<Topic>::iterator topic_it;
std::vector<Topic>::iterator topic_end = m_topics.end();
Expand All @@ -151,17 +150,20 @@ ObserverPool::add( cppy::ptr& topic, cppy::ptr& observer, uint8_t change_types )
}
if( obs_free == obs_end )
{
m_observers.insert( obs_end, Observer( observer, change_types ) );
m_observers.emplace( obs_end, observer, change_types );
++topic_it->m_count;
}
else
*obs_free = Observer( observer, change_types );
{
obs_free->m_observer = observer;
obs_free->m_change_types = change_types;
}
return;
}
obs_offset += topic_it->m_count;
}
m_topics.push_back( Topic( topic, 1 ) );
m_observers.push_back( Observer(observer, change_types) );
m_topics.emplace_back( topic, 1 );
m_observers.emplace_back( observer, change_types );
}


Expand All @@ -170,7 +172,7 @@ ObserverPool::remove( cppy::ptr& topic, cppy::ptr& observer )
{
if( m_modify_guard )
{
ModifyTask* task = new RemoveTask( this, topic, observer );
ModifyTask* task = new RemoveTask( this, topic.get(), observer.get() );
m_modify_guard->add_task( task );
return;
}
Expand Down Expand Up @@ -207,7 +209,7 @@ ObserverPool::remove( cppy::ptr& topic )
{
if( m_modify_guard )
{
ModifyTask* task = new RemoveTopicTask( this, topic );
ModifyTask* task = new RemoveTopicTask( this, topic.get() );
m_modify_guard->add_task( task );
return;
}
Expand Down Expand Up @@ -276,12 +278,16 @@ ObserverPool::notify( cppy::ptr& topic, cppy::ptr& args, cppy::ptr& kwargs, uint
{
if( obs_it->m_observer.is_truthy() )
{
if( obs_it->enabled( change_types ) && !obs_it->m_observer.call( args, kwargs ) )
return false;
if( obs_it->enabled( change_types ) )
{
cppy::ptr ok( obs_it->m_observer.call( args, kwargs ) );
if (!ok)
return false;
}
}
else
{
ModifyTask* task = new RemoveTask( this, topic, obs_it->m_observer );
ModifyTask* task = new RemoveTask( this, topic.get(), obs_it->m_observer.get() );
m_modify_guard->add_task( task );
}
}
Expand Down Expand Up @@ -335,7 +341,7 @@ ObserverPoolManager::acquire_pool(uint32_t &index)
if ( m_pools.size() >= MAX_OBSERVER_POOL_COUNT)
return false;
index = m_pools.size();
m_pools.emplace_back();
m_pools.emplace_back(new ObserverPool);
return true;
}
index = m_free_slots.back();
Expand All @@ -347,14 +353,15 @@ ObserverPoolManager::acquire_pool(uint32_t &index)
void
ObserverPoolManager::release_pool(uint32_t index)
{
if ( m_pools.at(index).has_guard() )
ObserverPool* pool = m_pools.at(index);
if ( pool->has_guard() )
{
m_pools.at(index).release(index);
return;
pool->release(index);
return; // Release when guard is finished
}
m_pools.at(index).clear();
pool->clear();
m_free_slots.emplace_back(index);
// pool size never decreases
// TODO: pool size never decreases
}


Expand Down
4 changes: 2 additions & 2 deletions atom/src/observerpool.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ class ObserverPoolManager

// Access a pool at the given index
inline ObserverPool* access_pool(uint32_t index) {
return &m_pools.at(index);
return m_pools.at(index);
}

// Release and free the pool at the given index
Expand All @@ -117,7 +117,7 @@ class ObserverPoolManager
ObserverPoolManager() {}
~ObserverPoolManager() {}
private:
std::vector<ObserverPool> m_pools;
std::vector<ObserverPool*> m_pools;
std::vector<uint32_t> m_free_slots;
};

Expand Down

0 comments on commit 468b0c0

Please sign in to comment.