Skip to content

Commit

Permalink
UCS: Introduce lightweight rwlock - 2
Browse files Browse the repository at this point in the history
  • Loading branch information
Artemy-Mellanox committed Jan 11, 2025
1 parent 89415df commit 5609ae7
Show file tree
Hide file tree
Showing 8 changed files with 238 additions and 88 deletions.
5 changes: 5 additions & 0 deletions src/ucs/arch/aarch64/cpu.h
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,11 @@ static inline ucs_status_t ucs_arch_get_cache_size(size_t *cache_sizes)
return UCS_ERR_UNSUPPORTED;
}

static UCS_F_ALWAYS_INLINE void ucs_cpu_relax()
{
asm volatile ("yield" ::: "memory");
}

END_C_DECLS

#endif
59 changes: 59 additions & 0 deletions src/ucs/arch/atomic.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#ifndef UCS_ARCH_ATOMIC_H
#define UCS_ARCH_ATOMIC_H

#include <ucs/sys/compiler_def.h>
#include <stdint.h>

#if defined(__x86_64__)
Expand Down Expand Up @@ -138,4 +139,62 @@ UCS_DEFINE_ATOMIC_BOOL_CSWAP(16, w);
UCS_DEFINE_ATOMIC_BOOL_CSWAP(32, l);
UCS_DEFINE_ATOMIC_BOOL_CSWAP(64, q);


#define UCS_ATOMIC_WEAK UCS_BIT(0)
#define UCS_ATOMIC_FENCE_LOCK UCS_BIT(1)
#define UCS_ATOMIC_FENCE_UNLOCK UCS_BIT(2)


static UCS_F_ALWAYS_INLINE int
ucs_atomic_memorder(unsigned flags)
{
if (flags & UCS_ATOMIC_FENCE_LOCK) {
return __ATOMIC_ACQUIRE;
}

if (flags & UCS_ATOMIC_FENCE_UNLOCK) {
return __ATOMIC_RELEASE;
}

return __ATOMIC_RELAXED;
}


static UCS_F_ALWAYS_INLINE int
ucs_atomic_get(int *ptr, unsigned flags)
{
return __atomic_load_n(ptr, ucs_atomic_memorder(flags));
}


static UCS_F_ALWAYS_INLINE int
ucs_atomic_fadd(int *ptr, int val, unsigned flags)
{
return __atomic_fetch_add(ptr, val, ucs_atomic_memorder(flags));
}


static UCS_F_ALWAYS_INLINE void
ucs_atomic_sub(int *ptr, int val, unsigned flags)
{
__atomic_fetch_sub(ptr, val, ucs_atomic_memorder(flags));
}


static UCS_F_ALWAYS_INLINE void
ucs_atomic_or(int *ptr, int val, unsigned flags)
{
__atomic_fetch_or(ptr, val, ucs_atomic_memorder(flags));
}


static UCS_F_ALWAYS_INLINE int
ucs_atomic_cswap(int *ptr, int old_val, int new_val, unsigned flags)
{
return __atomic_compare_exchange_n(ptr, &old_val, new_val,
flags & UCS_ATOMIC_WEAK,
ucs_atomic_memorder(flags),
__ATOMIC_RELAXED);
}

#endif
7 changes: 0 additions & 7 deletions src/ucs/arch/cpu.h
Original file line number Diff line number Diff line change
Expand Up @@ -177,13 +177,6 @@ static inline int ucs_cpu_prefer_relaxed_order()
const char *ucs_cpu_vendor_name();
const char *ucs_cpu_model_name();

#ifndef UCS_HAS_CPU_RELAX
static UCS_F_ALWAYS_INLINE void ucs_cpu_relax()
{
sched_yield();
}
#endif

END_C_DECLS

#endif
7 changes: 7 additions & 0 deletions src/ucs/arch/ppc64/cpu.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,13 @@ static inline ucs_status_t ucs_arch_get_cache_size(size_t *cache_sizes)
return UCS_ERR_UNSUPPORTED;
}

static UCS_F_ALWAYS_INLINE void ucs_cpu_relax()
{
asm volatile ("or 1, 1, 1 \n"); /* hw threading low priority */
asm volatile ("or 2, 2, 2 \n"); /* hw threading normal priority */
asm volatile ("" ::: "memory");
}

END_C_DECLS

#endif
5 changes: 5 additions & 0 deletions src/ucs/arch/rv64/cpu.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,11 @@ ucs_memcpy_nontemporal(void *dst, const void *src, size_t len)
memcpy(dst, src, len);
}

static UCS_F_ALWAYS_INLINE void ucs_cpu_relax()
{
asm volatile ("" ::: "memory");
}

END_C_DECLS

#endif
5 changes: 2 additions & 3 deletions src/ucs/arch/x86_64/cpu.h
Original file line number Diff line number Diff line change
Expand Up @@ -135,13 +135,12 @@ ucs_memcpy_nontemporal(void *dst, const void *src, size_t len)
ucs_x86_memcpy_sse_movntdqa(dst, src, len);
}

#ifdef __SSE2__
static UCS_F_ALWAYS_INLINE void ucs_cpu_relax()
{
#ifdef __SSE2__
_mm_pause();
}
#define UCS_HAS_CPU_RELAX
#endif
}

END_C_DECLS

Expand Down
79 changes: 48 additions & 31 deletions src/ucs/type/rwlock.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,14 @@
#ifndef UCS_RWLOCK_H
#define UCS_RWLOCK_H

#include <ucs/arch/atomic.h>
#include <ucs/arch/cpu.h>
#include <ucs/debug/assert.h>
#include <ucs/sys/compiler_def.h>
#include <errno.h>

/**
* The ucs_rwlock_t type.
* The ucs_rw_spinlock_t type.
*
* Readers increment the counter by UCS_RWLOCK_READ (4)
* Writers set the UCS_RWLOCK_WRITE bit when lock is held
Expand All @@ -27,94 +30,108 @@
* WAIT: writer pending --/
*/

#define UCS_RWLOCK_WAIT 0x1 /* Writer is waiting */
#define UCS_RWLOCK_WRITE 0x2 /* Writer has the lock */
#define UCS_RWLOCK_WAIT UCS_BIT(0) /* Writer is waiting */
#define UCS_RWLOCK_WRITE UCS_BIT(1) /* Writer has the lock */
#define UCS_RWLOCK_MASK (UCS_RWLOCK_WAIT | UCS_RWLOCK_WRITE)
#define UCS_RWLOCK_READ 0x4 /* Reader increment */
#define UCS_RWLOCK_READ UCS_BIT(2) /* Reader increment */

#define UCS_RWLOCK_STATIC_INITIALIZER {0}


/**
* Read-write lock.
* Reader-writer spin lock.
*/
typedef struct {
volatile int l;
} ucs_rwlock_t;
int state;
} ucs_rw_spinlock_t;


static inline void ucs_rwlock_read_lock(ucs_rwlock_t *lock)
static UCS_F_ALWAYS_INLINE void
ucs_rw_spinlock_read_lock(ucs_rw_spinlock_t *lock)
{
int x;

while (1) {
while (lock->l & UCS_RWLOCK_MASK) {
for (;;) {
while (ucs_atomic_get(&lock->state, 0) & UCS_RWLOCK_MASK) {
ucs_cpu_relax();
}

x = __atomic_fetch_add(&lock->l, UCS_RWLOCK_READ, __ATOMIC_ACQUIRE);
x = ucs_atomic_fadd(&lock->state, UCS_RWLOCK_READ,
UCS_ATOMIC_FENCE_LOCK);
if (!(x & UCS_RWLOCK_MASK)) {
return;
}

__atomic_fetch_sub(&lock->l, UCS_RWLOCK_READ, __ATOMIC_RELAXED);
ucs_atomic_sub(&lock->state, UCS_RWLOCK_READ, 0);
}
}


static inline void ucs_rwlock_read_unlock(ucs_rwlock_t *lock)
static UCS_F_ALWAYS_INLINE void
ucs_rw_spinlock_read_unlock(ucs_rw_spinlock_t *lock)
{
__atomic_fetch_sub(&lock->l, UCS_RWLOCK_READ, __ATOMIC_RELAXED);
ucs_assert(lock->state >= UCS_RWLOCK_READ);
ucs_atomic_sub(&lock->state, UCS_RWLOCK_READ, UCS_ATOMIC_FENCE_UNLOCK);
}


static inline void ucs_rwlock_write_lock(ucs_rwlock_t *lock)
static UCS_F_ALWAYS_INLINE void
ucs_rw_spinlock_write_lock(ucs_rw_spinlock_t *lock)
{
int x;

while (1) {
x = lock->l;
for (;;) {
x = ucs_atomic_get(&lock->state, 0);
if ((x < UCS_RWLOCK_WRITE) &&
(__atomic_compare_exchange_n(&lock->l, &x, UCS_RWLOCK_WRITE, 0,
__ATOMIC_ACQUIRE, __ATOMIC_RELAXED))) {
ucs_atomic_cswap(&lock->state, x, UCS_RWLOCK_WRITE,
UCS_ATOMIC_FENCE_LOCK)) {
return;
}

if (!(x & UCS_RWLOCK_WAIT)) {
__atomic_fetch_or(&lock->l, UCS_RWLOCK_WAIT, __ATOMIC_RELAXED);
ucs_atomic_or(&lock->state, UCS_RWLOCK_WAIT, 0);
}

while (lock->l > UCS_RWLOCK_WAIT) {
while (ucs_atomic_get(&lock->state, 0) > UCS_RWLOCK_WAIT) {
ucs_cpu_relax();
}
}
}


static inline int ucs_rwlock_write_trylock(ucs_rwlock_t *lock)
static UCS_F_ALWAYS_INLINE int
ucs_rw_spinlock_write_trylock(ucs_rw_spinlock_t *lock)
{
int x;

x = lock->l;
x = ucs_atomic_get(&lock->state, 0);
if ((x < UCS_RWLOCK_WRITE) &&
(__atomic_compare_exchange_n(&lock->l, &x, x + UCS_RWLOCK_WRITE, 1,
__ATOMIC_ACQUIRE, __ATOMIC_RELAXED))) {
return 0;
ucs_atomic_cswap(&lock->state, x, x + UCS_RWLOCK_WRITE,
UCS_ATOMIC_FENCE_LOCK | UCS_ATOMIC_WEAK)) {
return 1;
}

return -EBUSY;
return 0;
}


static inline void ucs_rwlock_write_unlock(ucs_rwlock_t *lock)
static UCS_F_ALWAYS_INLINE void
ucs_rw_spinlock_write_unlock(ucs_rw_spinlock_t *lock)
{
__atomic_fetch_sub(&lock->l, UCS_RWLOCK_WRITE, __ATOMIC_RELAXED);
ucs_assert(lock->state >= UCS_RWLOCK_WRITE);
ucs_atomic_sub(&lock->state, UCS_RWLOCK_WRITE, UCS_ATOMIC_FENCE_UNLOCK);
}


static inline void ucs_rwlock_init(ucs_rwlock_t *lock)
static UCS_F_ALWAYS_INLINE void ucs_rw_spinlock_init(ucs_rw_spinlock_t *lock)
{
lock->l = 0;
lock->state = 0;
}


static UCS_F_ALWAYS_INLINE void ucs_rw_spinlock_cleanup(ucs_rw_spinlock_t *lock)
{
ucs_assert(lock->state == 0);
}

#endif
Loading

0 comments on commit 5609ae7

Please sign in to comment.