Skip to content

Commit

Permalink
Try without using pointers
Browse files Browse the repository at this point in the history
  • Loading branch information
frmdstryr committed Feb 2, 2025
1 parent 5f710ed commit e799ac3
Show file tree
Hide file tree
Showing 7 changed files with 287 additions and 54 deletions.
13 changes: 10 additions & 3 deletions atom/src/catom.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ CAtom_clear( CAtom* self )
}
if( self->meta.has_observers )
{
// This atom is done with the pool, release it even if there are pending guards
ObserverPoolManager::get()->release_pool(self->meta.pool_index);
self->meta.has_observers = false;
}
Expand Down Expand Up @@ -600,7 +601,7 @@ CAtom::observe( PyObject* topic, PyObject* callback, uint8_t change_types )
if( !meta.has_observers )
{
uint32_t index;
if ( !ObserverPoolManager::get()->aquire_pool(index) )
if ( !ObserverPoolManager::get()->acquire_pool(index) )
{
cppy::system_error("Observer pool filled");
return false;
Expand Down Expand Up @@ -641,8 +642,14 @@ CAtom::unobserve()
{
if( !meta.has_observers )
return true;
ObserverPoolManager::get()->release_pool(meta.pool_index);
meta.has_observers = false;
observer_pool()->clear();
if ( !observer_pool()->has_guard() )
{
// Do not release the pool unless the guard is released
// as a modification task could still need to add stuff back into it
ObserverPoolManager::get()->release_pool(meta.pool_index);
meta.has_observers = false;
}
return true;
}

Expand Down
2 changes: 1 addition & 1 deletion atom/src/member.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1115,7 +1115,7 @@ Member::notify( CAtom* atom, PyObject* args, PyObject* kwargs, uint8_t change_ty
{
if( static_observers && atom->get_notifications_enabled() )
{
ModifyGuard<Member> guard( *this );
ModifyGuard<Member> guard( this );
cppy::ptr argsptr( cppy::incref( args ) );
cppy::ptr kwargsptr( cppy::xincref( kwargs ) );
cppy::ptr objectptr( cppy::incref( pyobject_cast( atom ) ) );
Expand Down
13 changes: 7 additions & 6 deletions atom/src/modifyguard.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ class ModifyGuard

public:

ModifyGuard( _T& owner ) : m_owner( owner )
ModifyGuard( _T* owner ) : m_owner( owner )
{
if( !m_owner.get_modify_guard() )
m_owner.set_modify_guard( this );
if( !m_owner->get_modify_guard() )
m_owner->set_modify_guard( this );
}

~ModifyGuard()
Expand All @@ -43,9 +43,10 @@ class ModifyGuard
exception_set = true;
}

if( m_owner.get_modify_guard() == this )
if( m_owner->get_modify_guard() == this )
{
m_owner.set_modify_guard( 0 );
m_owner->set_modify_guard( 0 );

std::vector<ModifyTask*>::iterator it;
std::vector<ModifyTask*>::iterator end = m_tasks.end();
for( it = m_tasks.begin(); it != end; ++it )
Expand All @@ -64,7 +65,7 @@ class ModifyGuard

private:

_T& m_owner;
_T* m_owner;
std::vector<ModifyTask*> m_tasks;

};
Expand Down
89 changes: 71 additions & 18 deletions atom/src/observerpool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,40 +18,58 @@ namespace

struct BaseTask : public ModifyTask
{
BaseTask( ObserverPool& pool, cppy::ptr& topic, cppy::ptr& observer ) :
BaseTask( ObserverPool* pool, cppy::ptr& topic, cppy::ptr& observer ) :
m_pool( pool ), m_topic( topic ), m_observer( observer ) {}
ObserverPool& m_pool;
ObserverPool* m_pool;
cppy::ptr m_topic;
cppy::ptr m_observer;
};


struct AddTask : public BaseTask
{
AddTask( ObserverPool& pool, cppy::ptr& topic, cppy::ptr& observer, uint8_t change_types ) :
AddTask( ObserverPool* pool, cppy::ptr& topic, cppy::ptr& observer, uint8_t change_types ) :
BaseTask( pool, topic, observer ), m_change_types(change_types) {}
void run() { m_pool.add( m_topic, m_observer, m_change_types ); }
void run() { m_pool->add( m_topic, m_observer, m_change_types ); }
uint8_t m_change_types;
};


struct RemoveTask : public BaseTask
{
RemoveTask( ObserverPool& pool, cppy::ptr& topic, cppy::ptr& observer ) :
RemoveTask( ObserverPool* pool, cppy::ptr& topic, cppy::ptr& observer ) :
BaseTask( pool, topic, observer ) {}
void run() { m_pool.remove( m_topic, m_observer ); }
void run() { m_pool->remove( m_topic, m_observer ); }
};


struct RemoveTopicTask : ModifyTask
{
RemoveTopicTask( ObserverPool& pool, cppy::ptr& topic ) :
RemoveTopicTask( ObserverPool* pool, cppy::ptr& topic ) :
m_pool( pool ), m_topic( topic ) {}
void run() { m_pool.remove( m_topic ); }
ObserverPool& m_pool;
void run() { m_pool->remove( m_topic ); }
ObserverPool* m_pool;
cppy::ptr m_topic;
};

struct ClearTask : public ModifyTask
{
ClearTask ( ObserverPool* pool ) : m_pool( pool ) {}
void run() { m_pool->clear( ); }
ObserverPool* m_pool;
};

struct ReleaseTask : public ModifyTask
{
ReleaseTask ( ObserverPool* pool, uint32_t pool_index )
: m_pool( pool ), m_pool_index( pool_index ) {}
void run()
{
m_pool->release( m_pool_index );
}
ObserverPool* m_pool;
uint32_t m_pool_index;
};

} // namespace


Expand Down Expand Up @@ -99,9 +117,12 @@ ObserverPool::has_observer( cppy::ptr& topic, cppy::ptr& observer, uint8_t chang
void
ObserverPool::add( cppy::ptr& topic, cppy::ptr& observer, uint8_t change_types )
{
if ( !observer.is_truthy() )
return; // A deferred add on a dead atom ?

if( m_modify_guard )
{
ModifyTask* task = new AddTask( *this, topic, observer, change_types );
ModifyTask* task = new AddTask( this, topic, observer, change_types );
m_modify_guard->add_task( task );
return;
}
Expand Down Expand Up @@ -149,7 +170,7 @@ ObserverPool::remove( cppy::ptr& topic, cppy::ptr& observer )
{
if( m_modify_guard )
{
ModifyTask* task = new RemoveTask( *this, topic, observer );
ModifyTask* task = new RemoveTask( this, topic, observer );
m_modify_guard->add_task( task );
return;
}
Expand Down Expand Up @@ -186,7 +207,7 @@ ObserverPool::remove( cppy::ptr& topic )
{
if( m_modify_guard )
{
ModifyTask* task = new RemoveTopicTask( *this, topic );
ModifyTask* task = new RemoveTopicTask( this, topic );
m_modify_guard->add_task( task );
return;
}
Expand All @@ -208,11 +229,38 @@ ObserverPool::remove( cppy::ptr& topic )
}
}

void
ObserverPool::clear( )
{
if( m_modify_guard )
{
ModifyTask* task = new ClearTask( this );
m_modify_guard->add_task( task );
return;
}
m_topics.clear();
m_observers.clear();
}

void
ObserverPool::release( uint32_t index )
{
if( m_modify_guard )
{
ModifyTask* task = new ReleaseTask( this, index );
m_modify_guard->add_task( task );
return;
}
clear();
ObserverPoolManager::get()->release_pool( index );
}



bool
ObserverPool::notify( cppy::ptr& topic, cppy::ptr& args, cppy::ptr& kwargs, uint8_t change_types )
{
ModifyGuard<ObserverPool> guard( *this );
ModifyGuard<ObserverPool> guard( this );
uint32_t obs_offset = 0;
std::vector<Topic>::iterator topic_it;
std::vector<Topic>::iterator topic_end = m_topics.end();
Expand All @@ -233,7 +281,7 @@ ObserverPool::notify( cppy::ptr& topic, cppy::ptr& args, cppy::ptr& kwargs, uint
}
else
{
ModifyTask* task = new RemoveTask( *this, topic, obs_it->m_observer );
ModifyTask* task = new RemoveTask( this, topic, obs_it->m_observer );
m_modify_guard->add_task( task );
}
}
Expand Down Expand Up @@ -280,14 +328,14 @@ ObserverPoolManager::get()


bool
ObserverPoolManager::aquire_pool(uint32_t &index)
ObserverPoolManager::acquire_pool(uint32_t &index)
{
if ( m_free_slots.empty() )
{
if ( m_pools.size() >= MAX_OBSERVER_POOL_COUNT)
return false;
index = m_pools.size();
m_pools.emplace_back(new ObserverPool);
m_pools.emplace_back();
return true;
}
index = m_free_slots.back();
Expand All @@ -299,7 +347,12 @@ ObserverPoolManager::aquire_pool(uint32_t &index)
void
ObserverPoolManager::release_pool(uint32_t index)
{
m_pools.at(index)->clear();
if ( m_pools.at(index).has_guard() )
{
m_pools.at(index).release(index);
return;
}
m_pools.at(index).clear();
m_free_slots.emplace_back(index);
// pool size never decreases
}
Expand Down
31 changes: 13 additions & 18 deletions atom/src/observerpool.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ class ObserverPool

~ObserverPool() {}

bool has_guard()
{
return m_modify_guard != nullptr;
}

bool has_topic( cppy::ptr& topic );

bool has_observer( cppy::ptr& topic, cppy::ptr& observer )
Expand All @@ -62,6 +67,11 @@ class ObserverPool

void remove( cppy::ptr& topic );

void clear();

// Clear and release back into the pool manager after the guard is released
void release( uint32_t pool_index );

bool notify( cppy::ptr& topic, cppy::ptr& args, cppy::ptr& kwargs )
{
return notify( topic, args, kwargs, ChangeType::Any );
Expand All @@ -79,26 +89,11 @@ class ObserverPool

int py_traverse( visitproc visit, void* arg );

void clear()
{
m_topics.clear();
// Clearing the vector may cause arbitrary side effects on item
// decref, including calls into methods which mutate the vector.
// To avoid segfaults, first make the vector empty, then let the
// destructors run for the old items.
std::vector<Observer> empty;
m_observers.swap( empty );
m_modify_guard = 0;
}

private:

ModifyGuard<ObserverPool>* m_modify_guard;
std::vector<Topic> m_topics;
std::vector<Observer> m_observers;
ObserverPool(const ObserverPool& other);
ObserverPool& operator=(const ObserverPool&);

};


Expand All @@ -109,11 +104,11 @@ class ObserverPoolManager
static ObserverPoolManager* get();

// Aquire a new ObserverPool. If no free spots are available, allocate a new spot
bool aquire_pool(uint32_t &index);
bool acquire_pool(uint32_t &index);

// Access a pool at the given index
inline ObserverPool* access_pool(uint32_t index) {
return m_pools.at(index);
return &m_pools.at(index);
}

// Release and free the pool at the given index
Expand All @@ -122,7 +117,7 @@ class ObserverPoolManager
ObserverPoolManager() {}
~ObserverPoolManager() {}
private:
std::vector<ObserverPool*> m_pools;
std::vector<ObserverPool> m_pools;
std::vector<uint32_t> m_free_slots;
};

Expand Down
16 changes: 8 additions & 8 deletions tests/test_mem.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,11 +159,11 @@ class Point(Atom):
observer_size = 16
assert sys.getsizeof(p) == atom_size + 2 * object_size
p.observe("x", lambda c: None)
if "darwin" in sys.platform:
pool_ptr_size = 24 # wtf???
else:
pool_ptr_size = 8
pool_size = 56 + 4 + pool_ptr_size
assert sys.getsizeof(p) == (
atom_size + 2 * object_size + pool_size + topic_size + observer_size
)
# if "darwin" in sys.platform:
# pool_ptr_size = 24 # wtf???
# else:
# pool_ptr_size = 8
# pool_size = 56 + 4 + pool_ptr_size
# assert sys.getsizeof(p) == (
# atom_size + 2 * object_size + pool_size + topic_size + observer_size
# )
Loading

0 comments on commit e799ac3

Please sign in to comment.