@@ -68,6 +68,18 @@ using ipcTestParams =
68
68
struct umfIpcTest : umf_test::test,
69
69
::testing::WithParamInterface<ipcTestParams> {
70
70
umfIpcTest () {}
71
+ size_t getOpenedIpcCacheSize () {
72
+ const char *max_size_str = getenv (" UMF_MAX_OPENED_IPC_HANDLES" );
73
+ if (max_size_str) {
74
+ char *endptr;
75
+ size_t max_size = strtoul (max_size_str, &endptr, 10 );
76
+ EXPECT_EQ (*endptr, ' \0 ' );
77
+ if (*endptr == ' \0 ' ) {
78
+ return max_size;
79
+ }
80
+ }
81
+ return 0 ;
82
+ }
71
83
void SetUp () override {
72
84
test::SetUp ();
73
85
auto [pool_ops, pool_params_create, pool_params_destroy, provider_ops,
@@ -80,6 +92,7 @@ struct umfIpcTest : umf_test::test,
80
92
providerParamsCreate = provider_params_create;
81
93
providerParamsDestroy = provider_params_destroy;
82
94
memAccessor = accessor;
95
+ openedIpcCacheSize = getOpenedIpcCacheSize ();
83
96
}
84
97
85
98
void TearDown () override { test::TearDown (); }
@@ -160,6 +173,7 @@ struct umfIpcTest : umf_test::test,
160
173
umf_memory_provider_ops_t *providerOps = nullptr ;
161
174
pfnProviderParamsCreate providerParamsCreate = nullptr ;
162
175
pfnProviderParamsDestroy providerParamsDestroy = nullptr ;
176
+ size_t openedIpcCacheSize = 0 ;
163
177
164
178
void concurrentGetConcurrentPutHandles (bool shuffle) {
165
179
std::vector<void *> ptrs;
@@ -264,6 +278,156 @@ struct umfIpcTest : umf_test::test,
264
278
pool.reset (nullptr );
265
279
EXPECT_EQ (stat.putCount , stat.getCount );
266
280
}
281
+
282
+ void concurrentOpenConcurrentCloseHandles (bool shuffle) {
283
+ umf_result_t ret;
284
+ std::vector<void *> ptrs;
285
+ constexpr size_t ALLOC_SIZE = 100 ;
286
+ constexpr size_t NUM_POINTERS = 100 ;
287
+ umf::pool_unique_handle_t pool = makePool ();
288
+ ASSERT_NE (pool.get (), nullptr );
289
+
290
+ for (size_t i = 0 ; i < NUM_POINTERS; ++i) {
291
+ void *ptr = umfPoolMalloc (pool.get (), ALLOC_SIZE);
292
+ EXPECT_NE (ptr, nullptr );
293
+ ptrs.push_back (ptr);
294
+ }
295
+
296
+ std::vector<umf_ipc_handle_t > ipcHandles;
297
+ for (size_t i = 0 ; i < NUM_POINTERS; ++i) {
298
+ umf_ipc_handle_t ipcHandle;
299
+ size_t handleSize;
300
+ ret = umfGetIPCHandle (ptrs[i], &ipcHandle, &handleSize);
301
+ ASSERT_EQ (ret, UMF_RESULT_SUCCESS);
302
+ ipcHandles.push_back (ipcHandle);
303
+ }
304
+
305
+ std::array<std::vector<void *>, NTHREADS> openedIpcHandles;
306
+ umf_ipc_handler_handle_t ipcHandler = nullptr ;
307
+ ret = umfPoolGetIPCHandler (pool.get (), &ipcHandler);
308
+ ASSERT_EQ (ret, UMF_RESULT_SUCCESS);
309
+ ASSERT_NE (ipcHandler, nullptr );
310
+
311
+ umf_test::syncthreads_barrier syncthreads (NTHREADS);
312
+
313
+ auto openHandlesFn = [shuffle, &ipcHandles, &openedIpcHandles,
314
+ &syncthreads, ipcHandler](size_t tid) {
315
+ // Each thread gets a copy of the pointers to shuffle them
316
+ std::vector<umf_ipc_handle_t > localIpcHandles = ipcHandles;
317
+ if (shuffle) {
318
+ std::random_device rd;
319
+ std::mt19937 g (rd ());
320
+ std::shuffle (localIpcHandles.begin (), localIpcHandles.end (), g);
321
+ }
322
+ syncthreads ();
323
+ for (auto ipcHandle : localIpcHandles) {
324
+ void *ptr;
325
+ umf_result_t ret =
326
+ umfOpenIPCHandle (ipcHandler, ipcHandle, &ptr);
327
+ ASSERT_EQ (ret, UMF_RESULT_SUCCESS);
328
+ openedIpcHandles[tid].push_back (ptr);
329
+ }
330
+ };
331
+
332
+ umf_test::parallel_exec (NTHREADS, openHandlesFn);
333
+
334
+ auto closeHandlesFn = [&openedIpcHandles, &syncthreads](size_t tid) {
335
+ syncthreads ();
336
+ for (void *ptr : openedIpcHandles[tid]) {
337
+ umf_result_t ret = umfCloseIPCHandle (ptr);
338
+ EXPECT_EQ (ret, UMF_RESULT_SUCCESS);
339
+ }
340
+ };
341
+
342
+ umf_test::parallel_exec (NTHREADS, closeHandlesFn);
343
+
344
+ for (auto ipcHandle : ipcHandles) {
345
+ ret = umfPutIPCHandle (ipcHandle);
346
+ EXPECT_EQ (ret, UMF_RESULT_SUCCESS);
347
+ }
348
+
349
+ for (void *ptr : ptrs) {
350
+ ret = umfPoolFree (pool.get (), ptr);
351
+ EXPECT_EQ (ret, UMF_RESULT_SUCCESS);
352
+ }
353
+
354
+ pool.reset (nullptr );
355
+ EXPECT_EQ (stat.getCount , stat.allocCount );
356
+ EXPECT_EQ (stat.putCount , stat.getCount );
357
+ EXPECT_EQ (stat.openCount , stat.allocCount );
358
+ EXPECT_EQ (stat.openCount , stat.closeCount );
359
+ }
360
+
361
+ void concurrentOpenCloseHandles (bool shuffle) {
362
+ umf_result_t ret;
363
+ std::vector<void *> ptrs;
364
+ constexpr size_t ALLOC_SIZE = 100 ;
365
+ constexpr size_t NUM_POINTERS = 100 ;
366
+ umf::pool_unique_handle_t pool = makePool ();
367
+ ASSERT_NE (pool.get (), nullptr );
368
+
369
+ for (size_t i = 0 ; i < NUM_POINTERS; ++i) {
370
+ void *ptr = umfPoolMalloc (pool.get (), ALLOC_SIZE);
371
+ EXPECT_NE (ptr, nullptr );
372
+ ptrs.push_back (ptr);
373
+ }
374
+
375
+ std::vector<umf_ipc_handle_t > ipcHandles;
376
+ for (size_t i = 0 ; i < NUM_POINTERS; ++i) {
377
+ umf_ipc_handle_t ipcHandle;
378
+ size_t handleSize;
379
+ ret = umfGetIPCHandle (ptrs[i], &ipcHandle, &handleSize);
380
+ ASSERT_EQ (ret, UMF_RESULT_SUCCESS);
381
+ ipcHandles.push_back (ipcHandle);
382
+ }
383
+
384
+ umf_ipc_handler_handle_t ipcHandler = nullptr ;
385
+ ret = umfPoolGetIPCHandler (pool.get (), &ipcHandler);
386
+ ASSERT_EQ (ret, UMF_RESULT_SUCCESS);
387
+ ASSERT_NE (ipcHandler, nullptr );
388
+
389
+ umf_test::syncthreads_barrier syncthreads (NTHREADS);
390
+
391
+ auto openCloseHandlesFn = [shuffle, &ipcHandles, &syncthreads,
392
+ ipcHandler](size_t ) {
393
+ // Each thread gets a copy of the pointers to shuffle them
394
+ std::vector<umf_ipc_handle_t > localIpcHandles = ipcHandles;
395
+ if (shuffle) {
396
+ std::random_device rd;
397
+ std::mt19937 g (rd ());
398
+ std::shuffle (localIpcHandles.begin (), localIpcHandles.end (), g);
399
+ }
400
+ syncthreads ();
401
+ for (auto ipcHandle : localIpcHandles) {
402
+ void *ptr;
403
+ umf_result_t ret =
404
+ umfOpenIPCHandle (ipcHandler, ipcHandle, &ptr);
405
+ ASSERT_EQ (ret, UMF_RESULT_SUCCESS);
406
+ ret = umfCloseIPCHandle (ptr);
407
+ EXPECT_EQ (ret, UMF_RESULT_SUCCESS);
408
+ }
409
+ };
410
+
411
+ umf_test::parallel_exec (NTHREADS, openCloseHandlesFn);
412
+
413
+ for (auto ipcHandle : ipcHandles) {
414
+ ret = umfPutIPCHandle (ipcHandle);
415
+ EXPECT_EQ (ret, UMF_RESULT_SUCCESS);
416
+ }
417
+
418
+ for (void *ptr : ptrs) {
419
+ ret = umfPoolFree (pool.get (), ptr);
420
+ EXPECT_EQ (ret, UMF_RESULT_SUCCESS);
421
+ }
422
+
423
+ pool.reset (nullptr );
424
+ EXPECT_EQ (stat.getCount , stat.allocCount );
425
+ EXPECT_EQ (stat.putCount , stat.getCount );
426
+ if (openedIpcCacheSize == 0 ) {
427
+ EXPECT_EQ (stat.openCount , stat.allocCount );
428
+ }
429
+ EXPECT_EQ (stat.openCount , stat.closeCount );
430
+ }
267
431
};
268
432
269
433
TEST_P (umfIpcTest, GetIPCHandleSize) {
@@ -529,75 +693,20 @@ TEST_P(umfIpcTest, ConcurrentGetPutHandlesShuffled) {
529
693
concurrentGetPutHandles (true );
530
694
}
531
695
532
- TEST_P (umfIpcTest, ConcurrentOpenCloseHandles) {
533
- umf_result_t ret;
534
- std::vector<void *> ptrs;
535
- constexpr size_t ALLOC_SIZE = 100 ;
536
- constexpr size_t NUM_POINTERS = 100 ;
537
- umf::pool_unique_handle_t pool = makePool ();
538
- ASSERT_NE (pool.get (), nullptr );
539
-
540
- for (size_t i = 0 ; i < NUM_POINTERS; ++i) {
541
- void *ptr = umfPoolMalloc (pool.get (), ALLOC_SIZE);
542
- EXPECT_NE (ptr, nullptr );
543
- ptrs.push_back (ptr);
544
- }
545
-
546
- std::array<umf_ipc_handle_t , NUM_POINTERS> ipcHandles;
547
- for (size_t i = 0 ; i < NUM_POINTERS; ++i) {
548
- umf_ipc_handle_t ipcHandle;
549
- size_t handleSize;
550
- ret = umfGetIPCHandle (ptrs[i], &ipcHandle, &handleSize);
551
- ASSERT_EQ (ret, UMF_RESULT_SUCCESS);
552
- ipcHandles[i] = ipcHandle;
553
- }
554
-
555
- std::array<std::vector<void *>, NTHREADS> openedIpcHandles;
556
- umf_ipc_handler_handle_t ipcHandler = nullptr ;
557
- ret = umfPoolGetIPCHandler (pool.get (), &ipcHandler);
558
- ASSERT_EQ (ret, UMF_RESULT_SUCCESS);
559
- ASSERT_NE (ipcHandler, nullptr );
560
-
561
- umf_test::syncthreads_barrier syncthreads (NTHREADS);
562
-
563
- auto openHandlesFn = [&ipcHandles, &openedIpcHandles, &syncthreads,
564
- ipcHandler](size_t tid) {
565
- syncthreads ();
566
- for (auto ipcHandle : ipcHandles) {
567
- void *ptr;
568
- umf_result_t ret = umfOpenIPCHandle (ipcHandler, ipcHandle, &ptr);
569
- ASSERT_EQ (ret, UMF_RESULT_SUCCESS);
570
- openedIpcHandles[tid].push_back (ptr);
571
- }
572
- };
573
-
574
- umf_test::parallel_exec (NTHREADS, openHandlesFn);
575
-
576
- auto closeHandlesFn = [&openedIpcHandles, &syncthreads](size_t tid) {
577
- syncthreads ();
578
- for (void *ptr : openedIpcHandles[tid]) {
579
- umf_result_t ret = umfCloseIPCHandle (ptr);
580
- EXPECT_EQ (ret, UMF_RESULT_SUCCESS);
581
- }
582
- };
583
-
584
- umf_test::parallel_exec (NTHREADS, closeHandlesFn);
696
+ TEST_P (umfIpcTest, ConcurrentOpenConcurrentCloseHandles) {
697
+ concurrentOpenConcurrentCloseHandles (false );
698
+ }
585
699
586
- for (auto ipcHandle : ipcHandles) {
587
- ret = umfPutIPCHandle (ipcHandle);
588
- EXPECT_EQ (ret, UMF_RESULT_SUCCESS);
589
- }
700
+ TEST_P (umfIpcTest, ConcurrentOpenConcurrentCloseHandlesShuffled) {
701
+ concurrentOpenConcurrentCloseHandles (true );
702
+ }
590
703
591
- for (void *ptr : ptrs) {
592
- ret = umfPoolFree (pool.get (), ptr);
593
- EXPECT_EQ (ret, UMF_RESULT_SUCCESS);
594
- }
704
+ TEST_P (umfIpcTest, ConcurrentOpenCloseHandles) {
705
+ concurrentOpenCloseHandles (false );
706
+ }
595
707
596
- pool.reset (nullptr );
597
- EXPECT_EQ (stat.getCount , stat.allocCount );
598
- EXPECT_EQ (stat.putCount , stat.getCount );
599
- EXPECT_EQ (stat.openCount , stat.allocCount );
600
- EXPECT_EQ (stat.openCount , stat.closeCount );
708
+ TEST_P (umfIpcTest, ConcurrentOpenCloseHandlesShuffled) {
709
+ concurrentOpenCloseHandles (true );
601
710
}
602
711
603
712
TEST_P (umfIpcTest, ConcurrentDestroyIpcHandlers) {
0 commit comments