From 0d6dbb15f2613bd3aad03025571987200f2a09d5 Mon Sep 17 00:00:00 2001 From: Song Lin Date: Wed, 4 Mar 2026 16:39:03 +0800 Subject: [PATCH 1/9] fail pending resluts when exception occurs in _handle_results --- Lib/multiprocessing/connection.py | 3 ++- Lib/multiprocessing/pool.py | 30 ++++++++++++++++++++++++++++++ 2 files changed, 32 insertions(+), 1 deletion(-) diff --git a/Lib/multiprocessing/connection.py b/Lib/multiprocessing/connection.py index 41b36066c62fcb..7c1b7ecaa93f22 100644 --- a/Lib/multiprocessing/connection.py +++ b/Lib/multiprocessing/connection.py @@ -255,7 +255,8 @@ def recv(self): self._check_closed() self._check_readable() buf = self._recv_bytes() - return _ForkingPickler.loads(buf.getbuffer()) + res = _ForkingPickler.loads(buf.getbuffer()) + return res def poll(self, timeout=0.0): """Whether there is any input available to be read""" diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py index f979890170b1a1..ed13f91cb7ce23 100644 --- a/Lib/multiprocessing/pool.py +++ b/Lib/multiprocessing/pool.py @@ -158,6 +158,19 @@ def __init__(self, /, *args, notifier=None, **kwds): self.notifier = notifier super().__init__(*args, **kwds) + self._cache_failed = False + self._cache_failed_reason = None + + def _disable_cache(self, exec): + self._cache_failed = True + self._cache_failed_reason = exec + + def __setitem__(self, key, value): + if self._cache_failed: + raise RuntimeError("Pool cache is disabled due to previous error") \ + from self._cache_failed_reason + super().__setitem__(key, value) + def __delitem__(self, item): super().__delitem__(item) @@ -580,6 +593,23 @@ def _handle_results(outqueue, get, cache): except (OSError, EOFError): util.debug('result handler got EOFError/OSError -- exiting') return + except Exception as e: + exc = RuntimeError("Result handler failed to get result from worker and " + + "unable to maintain its states anymore. " + + "This is likely due to a worker process return or raise " + + "an unpicklable object.") + exc.__cause__ = e + cache._disable_cache(exc) + _cache = cache.copy() + for value in _cache.values(): + if isinstance(value, ApplyResult): + chunk_number_left = getattr(value, '_number_left', 1) + for _ in range(chunk_number_left): + value._set(None, (False, exc)) + elif isinstance(value, IMapIterator): + value._set_length(value._index + 1) + value._set(value._index, (False, exc)) + return if thread._state != RUN: assert thread._state == TERMINATE, "Thread not in TERMINATE" From fbd2fb7cec80e9230be1aef8b2f253591bd1bca5 Mon Sep 17 00:00:00 2001 From: Song Lin Date: Thu, 5 Mar 2026 09:25:14 +0800 Subject: [PATCH 2/9] revert unexpected file change --- Lib/multiprocessing/connection.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/Lib/multiprocessing/connection.py b/Lib/multiprocessing/connection.py index 7c1b7ecaa93f22..41b36066c62fcb 100644 --- a/Lib/multiprocessing/connection.py +++ b/Lib/multiprocessing/connection.py @@ -255,8 +255,7 @@ def recv(self): self._check_closed() self._check_readable() buf = self._recv_bytes() - res = _ForkingPickler.loads(buf.getbuffer()) - return res + return _ForkingPickler.loads(buf.getbuffer()) def poll(self, timeout=0.0): """Whether there is any input available to be read""" From ecdd1f1a19b8a044818ba21e34fd03790514f683 Mon Sep 17 00:00:00 2001 From: Song Lin Date: Thu, 5 Mar 2026 09:32:41 +0800 Subject: [PATCH 3/9] update exception --- Lib/multiprocessing/pool.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py index ed13f91cb7ce23..5b75ba0dcc4132 100644 --- a/Lib/multiprocessing/pool.py +++ b/Lib/multiprocessing/pool.py @@ -595,7 +595,7 @@ def _handle_results(outqueue, get, cache): return except Exception as e: exc = RuntimeError("Result handler failed to get result from worker and " + - "unable to maintain its states anymore. " + + "unable to recover. " + "This is likely due to a worker process return or raise " + "an unpicklable object.") exc.__cause__ = e From 8ed92405ba2f9bd4a09416347457c31f2f0ef964 Mon Sep 17 00:00:00 2001 From: Song Lin Date: Thu, 5 Mar 2026 09:51:10 +0800 Subject: [PATCH 4/9] remove trim trailing whitespace --- Lib/multiprocessing/pool.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py index 5b75ba0dcc4132..d2a83929e42cb7 100644 --- a/Lib/multiprocessing/pool.py +++ b/Lib/multiprocessing/pool.py @@ -595,7 +595,7 @@ def _handle_results(outqueue, get, cache): return except Exception as e: exc = RuntimeError("Result handler failed to get result from worker and " + - "unable to recover. " + + "unable to recover. " + "This is likely due to a worker process return or raise " + "an unpicklable object.") exc.__cause__ = e From f7ee9b0468767f4f0530034759a75eefb3c3438e Mon Sep 17 00:00:00 2001 From: Song Lin Date: Thu, 5 Mar 2026 10:01:01 +0800 Subject: [PATCH 5/9] remove trim trailing whitespace --- Lib/multiprocessing/pool.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py index d2a83929e42cb7..e5d6528921bba7 100644 --- a/Lib/multiprocessing/pool.py +++ b/Lib/multiprocessing/pool.py @@ -164,7 +164,7 @@ def __init__(self, /, *args, notifier=None, **kwds): def _disable_cache(self, exec): self._cache_failed = True self._cache_failed_reason = exec - + def __setitem__(self, key, value): if self._cache_failed: raise RuntimeError("Pool cache is disabled due to previous error") \ From fcd9e7d01c29a3074232dc1953f7cd19321b8aa6 Mon Sep 17 00:00:00 2001 From: Song Lin Date: Thu, 5 Mar 2026 10:07:29 +0800 Subject: [PATCH 6/9] remove EOFError/OSError except block --- Lib/multiprocessing/pool.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py index e5d6528921bba7..0ad6b4f2cbbcf5 100644 --- a/Lib/multiprocessing/pool.py +++ b/Lib/multiprocessing/pool.py @@ -590,9 +590,6 @@ def _handle_results(outqueue, get, cache): while 1: try: task = get() - except (OSError, EOFError): - util.debug('result handler got EOFError/OSError -- exiting') - return except Exception as e: exc = RuntimeError("Result handler failed to get result from worker and " + "unable to recover. " + From 96fdc94f63e030725432056d1965dae3b80d246f Mon Sep 17 00:00:00 2001 From: Song Lin Date: Thu, 5 Mar 2026 11:00:29 +0800 Subject: [PATCH 7/9] add exception handle logic in anthor get() --- Lib/multiprocessing/pool.py | 37 ++++++++++++++++++++----------------- 1 file changed, 20 insertions(+), 17 deletions(-) diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py index 0ad6b4f2cbbcf5..faedbf4fede483 100644 --- a/Lib/multiprocessing/pool.py +++ b/Lib/multiprocessing/pool.py @@ -585,27 +585,30 @@ def _handle_tasks(taskqueue, put, outqueue, pool, cache): @staticmethod def _handle_results(outqueue, get, cache): + def _handle_results_failure(cache, e): + exc = RuntimeError("Result handler failed to get result from worker and " + + "unable to recover. " + + "This is likely due to a worker process return or raise " + + "an unpicklable object.") + exc.__cause__ = e + cache._disable_cache(exc) + _cache = cache.copy() + for value in _cache.values(): + if isinstance(value, ApplyResult): + chunk_number_left = getattr(value, '_number_left', 1) + for _ in range(chunk_number_left): + value._set(None, (False, exc)) + elif isinstance(value, IMapIterator): + value._set_length(value._index + 1) + value._set(value._index, (False, exc)) + thread = threading.current_thread() while 1: try: task = get() except Exception as e: - exc = RuntimeError("Result handler failed to get result from worker and " + - "unable to recover. " + - "This is likely due to a worker process return or raise " + - "an unpicklable object.") - exc.__cause__ = e - cache._disable_cache(exc) - _cache = cache.copy() - for value in _cache.values(): - if isinstance(value, ApplyResult): - chunk_number_left = getattr(value, '_number_left', 1) - for _ in range(chunk_number_left): - value._set(None, (False, exc)) - elif isinstance(value, IMapIterator): - value._set_length(value._index + 1) - value._set(value._index, (False, exc)) + _handle_results_failure(cache, e) return if thread._state != RUN: @@ -627,8 +630,8 @@ def _handle_results(outqueue, get, cache): while cache and thread._state != TERMINATE: try: task = get() - except (OSError, EOFError): - util.debug('result handler got EOFError/OSError -- exiting') + except Exception as e: + _handle_results_failure(cache, e) return if task is None: From 4160b06ee0d89290b22f08ddcdab5f399fe57144 Mon Sep 17 00:00:00 2001 From: Song Lin Date: Thu, 5 Mar 2026 11:13:46 +0800 Subject: [PATCH 8/9] add news entry --- .../next/Library/2026-03-05-11-13-07.gh-issue-103061.P_ldzn.rst | 1 + 1 file changed, 1 insertion(+) create mode 100644 Misc/NEWS.d/next/Library/2026-03-05-11-13-07.gh-issue-103061.P_ldzn.rst diff --git a/Misc/NEWS.d/next/Library/2026-03-05-11-13-07.gh-issue-103061.P_ldzn.rst b/Misc/NEWS.d/next/Library/2026-03-05-11-13-07.gh-issue-103061.P_ldzn.rst new file mode 100644 index 00000000000000..6f2497978cdbf3 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2026-03-05-11-13-07.gh-issue-103061.P_ldzn.rst @@ -0,0 +1 @@ +Fix `multiprocessing.Pool` hang when a unpicklable object is returned From bc2c3a3a63a320fbcf6cbdeb57f534c15124000c Mon Sep 17 00:00:00 2001 From: Song Lin Date: Thu, 5 Mar 2026 11:19:05 +0800 Subject: [PATCH 9/9] fix default role error in lint check --- .../next/Library/2026-03-05-11-13-07.gh-issue-103061.P_ldzn.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Misc/NEWS.d/next/Library/2026-03-05-11-13-07.gh-issue-103061.P_ldzn.rst b/Misc/NEWS.d/next/Library/2026-03-05-11-13-07.gh-issue-103061.P_ldzn.rst index 6f2497978cdbf3..2bbb905e29846c 100644 --- a/Misc/NEWS.d/next/Library/2026-03-05-11-13-07.gh-issue-103061.P_ldzn.rst +++ b/Misc/NEWS.d/next/Library/2026-03-05-11-13-07.gh-issue-103061.P_ldzn.rst @@ -1 +1 @@ -Fix `multiprocessing.Pool` hang when a unpicklable object is returned +Fix :code:`multiprocessing.Pool` hang when a unpicklable object is returned