Skip to content

a PariThreadPool to handle multithreading via Python #116

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ on:
push:
tags:
- '*'
workflow_dispatch:
# Allow to run manually

jobs:
build:
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ cypari2/handle_error.c
cypari2/pari_instance.c
cypari2/stack.c
cypari2/string_utils.c
cypari2/threads.c

# Byte-compiled / optimized / DLL files
__pycache__/
Expand Down
1 change: 1 addition & 0 deletions cypari2/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from .pari_instance import Pari
from .handle_error import PariError
from .gen import Gen
from .threads import PariThreadPool
5 changes: 4 additions & 1 deletion cypari2/stack.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,10 @@ cdef Gen new_gen_noclear(GEN x):
elif isclone(x):
gclone_refc(x)
return Gen_new(x, x)
raise SystemError("new_gen() argument not on PARI stack, not on PARI heap and not a universal constant")
else:
# NOTE: it might be the case that x belongs to a local stack of a thread
# In that case we copy it in the main stack
x = gcopy(x)

z = Gen_stack_new(x)

Expand Down
6 changes: 6 additions & 0 deletions cypari2/threads.pxd
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from .types cimport *

cdef class PariThreadPool:
cdef size_t nbthreads
cdef pari_thread * pths
cdef size_t ithread
82 changes: 82 additions & 0 deletions cypari2/threads.pyx
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
r"""
Multithreading from Python
**************************
"""

#*****************************************************************************
# Copyright (C) 2022 Vincent Delecroix <[email protected]>
#
# Distributed under the terms of the GNU General Public License (GPL)
# as published by the Free Software Foundation; either version 2 of
# the License, or (at your option) any later version.
# http://www.gnu.org/licenses/
#*****************************************************************************

from libc.stdlib cimport malloc, calloc, free

from .types cimport *
from .paridecl cimport *
from gen cimport Gen, objtogen

cdef class PariThreadPool:
r"""
Pari thread allocator

This class is intended to be used in conjunction with the multithreading
capabilities of the ``ThreadPoolExecutor`` from the ``concurrent.futures``
Python library.

Examples:

>>> from concurrent.futures import ThreadPoolExecutor, as_completed
>>> from cypari2 import Pari, PariThreadPool
>>> pari = Pari()
>>> pari.default('nbthreads', 1)
>>> max_workers = 4
>>> pari_pool = PariThreadPool(max_workers)
>>> square_free = []
>>> with ThreadPoolExecutor(max_workers=max_workers, initializer=pari_pool.initializer) as executor:
... futures = {executor.submit(pari.issquarefree, n): n for n in range(10**6, 10**6 + 1000)}
... for future in as_completed(futures):
... n = futures[future]
... if future.result():
... square_free.append(n)
>>> square_free.sort()
>>> square_free
[1000001, 1000002, 1000003, 1000005, 1000006, ..., 1000994, 1000995, 1000997, 1000999]
"""
def __init__(self, size_t nbthreads, size_t size=8000000, size_t sizemax=0):
r"""
INPUT:

- ``nbthreads`` -- the number of threads to allocate

- ``size`` -- (default: 8000000) the number of bytes for the
initial PARI stack (see notes below)

- ``sizemax`` -- (default: 0) the maximal number of bytes for the
dynamically increasing PARI stack.
"""
cdef size_t i
size = max(size, pari_mainstack.rsize)
sizemax = max(max(size, pari_mainstack.vsize), sizemax)
self.pths = <pari_thread *> calloc(nbthreads, sizeof(pari_thread))
for i in range(nbthreads):
pari_thread_valloc(self.pths + i, size, sizemax, NULL)
self.ithread = 0
self.nbthreads = nbthreads

def __dealloc__(self):
cdef size_t i
for i in range(self.ithread):
pari_thread_free(self.pths + i)
free(self.pths)

def __repr__(self):
return 'Pari thread pool with {} threads'.format(self.nbthreads)

def initializer(self):
if self.ithread >= self.nbthreads:
raise ValueError('no more thread available')
pari_thread_start(self.pths + self.ithread)
self.ithread += 1
3 changes: 2 additions & 1 deletion cypari2/types.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,8 @@ cdef extern from "pari/pari.h":
struct pariFILE
struct pari_mt
struct pari_stack
struct pari_thread
struct pari_thread:
pass
struct pari_timer
struct GENbin
struct hashentry
Expand Down
1 change: 1 addition & 0 deletions docs/source/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ Welcome to CyPari2's documentation!
closure
handle_error
convert
threads


Indices and tables
Expand Down
2 changes: 2 additions & 0 deletions docs/source/threads.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
.. automodule:: cypari2.threads
:members:
2 changes: 1 addition & 1 deletion tests/rundoctest.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
attempted = 0
for mod in [cypari2.closure, cypari2.convert, cypari2.gen,
cypari2.handle_error, cypari2.pari_instance, cypari2.stack,
cypari2.string_utils,
cypari2.string_utils, cypari2.threads,
autogen.doc, autogen.generator, autogen.parser,
autogen.paths]:

Expand Down