Skip to content

Commit 98d5e56

Browse files
[SPARK-51966][PYTHON] replace select.select() with select.poll() on posix
On glibc based Linux systems select() can monitor only file descriptor numbers that are less than FD_SETSIZE (1024). This is an unreasonably low limit for many modern applications.
1 parent 7019d5e commit 98d5e56

File tree

2 files changed

+41
-7
lines changed

2 files changed

+41
-7
lines changed

python/pyspark/accumulators.py

+23-5
Original file line numberDiff line numberDiff line change
@@ -266,11 +266,29 @@ def handle(self) -> None:
266266
auth_token = self.server.auth_token # type: ignore[attr-defined]
267267

268268
def poll(func: Callable[[], bool]) -> None:
269-
while not self.server.server_shutdown: # type: ignore[attr-defined]
270-
# Poll every 1 second for new data -- don't block in case of shutdown.
271-
r, _, _ = select.select([self.rfile], [], [], 1)
272-
if self.rfile in r and func():
273-
break
269+
rlist = [self.rfile.fileno()]
270+
poller = None
271+
try:
272+
if os.name == "posix":
273+
# On posix systems use poll to avoid problems with file descriptor numbers
274+
# above 1024.
275+
poller = select.poll()
276+
for fd in rlist:
277+
poller.register(fd, select.POLLIN)
278+
279+
while not self.server.server_shutdown: # type: ignore[attr-defined]
280+
# Poll every 1 second for new data -- don't block in case of shutdown.
281+
if poller is not None:
282+
r = [fd for fd, event in poller.poll(1) if event & select.POLLIN]
283+
else:
284+
# If poll is not available, use select.
285+
r, _, _ = select.select(rlist, [], [], 1)
286+
if self.rfile.fileno() in r and func():
287+
break
288+
finally:
289+
if poller is not None:
290+
for fd in rlist:
291+
poller.unregister(fd)
274292

275293
def accum_updates() -> bool:
276294
num_updates = read_int(self.rfile)

python/pyspark/daemon.py

+18-2
Original file line numberDiff line numberDiff line change
@@ -144,10 +144,22 @@ def handle_sigterm(*args):
144144
reuse = os.environ.get("SPARK_REUSE_WORKER")
145145

146146
# Initialization complete
147+
rlist = [0, listen_sock.fileno()]
148+
poller = None
147149
try:
150+
if os.name == "posix":
151+
# On posix systems use poll to avoid problems with file descriptor numbers above 1024.
152+
poller = select.poll()
153+
for fd in rlist:
154+
poller.register(fd, select.POLLIN)
155+
148156
while True:
149157
try:
150-
ready_fds = select.select([0, listen_sock], [], [], 1)[0]
158+
if poller is not None:
159+
ready_fds = [fd for fd, event in poller.poll(1)]
160+
else:
161+
# If poll is not available, use select.
162+
ready_fds = select.select(rlist, [], [], 1)[0]
151163
except select.error as ex:
152164
if ex[0] == EINTR:
153165
continue
@@ -165,7 +177,7 @@ def handle_sigterm(*args):
165177
except OSError:
166178
pass # process already died
167179

168-
if listen_sock in ready_fds:
180+
if listen_sock.fileno() in ready_fds:
169181
try:
170182
sock, _ = listen_sock.accept()
171183
except OSError as e:
@@ -238,6 +250,10 @@ def handle_sigterm(*args):
238250
sock.close()
239251

240252
finally:
253+
if poller is not None:
254+
for fd in rlist:
255+
poller.unregister(fd)
256+
241257
shutdown(1)
242258

243259

0 commit comments

Comments
 (0)