From f9ad391cd2ae32627c506761dc0187a575fd514d Mon Sep 17 00:00:00 2001
From: Greg <gdvalle@users.noreply.github.com>
Date: Sun, 3 Feb 2019 23:07:36 -0600
Subject: [PATCH] Fix warm shutdown crash from timeout handler (#271)
* Fix warm shutdown crash from timeout handler
This is an attempt at fixing #260 while attempting to not break #249.
I was unable to reproduce #249, so this is a bit of a guess, but my
thinking is that we shouldn't need to do a deep copy here because we're
not iterating over the values, just the keys, and a shallow copy
protects us from that.
Also, assuming that the cache keys can change within the loop, this moves
the copy inside the loop, and drops the explicit `list()` since we have
a proper copy already.
* Add TimeoutHandler.handle_timeouts iterate test
Test that we're able to iterate with an ApplyResult, which contain a
thread.Lock that is not pickleable.
---
billiard/pool.py | 7 +++++--
t/unit/test_pool.py | 10 ++++++++++
2 files changed, 15 insertions(+), 2 deletions(-)
diff --git a/billiard/pool.py b/billiard/pool.py
index 8547728..5f31c57 100644
--- a/billiard/pool.py
+++ b/billiard/pool.py
@@ -694,7 +694,6 @@ def _trywaitkill(self, worker):
pass
def handle_timeouts(self):
- cache = copy.deepcopy(self.cache)
t_hard, t_soft = self.t_hard, self.t_soft
dirty = set()
on_soft_timeout = self.on_soft_timeout
@@ -708,12 +707,16 @@ def _timed_out(start, timeout):
# Inner-loop
while self._state == RUN:
+ # Perform a shallow copy before iteration because keys can change.
+ # A deep copy fails (on shutdown) due to thread.lock objects.
+ # https://github.com/celery/billiard/issues/260
+ cache = copy.copy(self.cache)
# Remove dirty items not in cache anymore
if dirty:
dirty = set(k for k in dirty if k in cache)
- for i, job in list(cache.items()):
+ for i, job in cache.items():
ack_time = job._time_accepted
soft_timeout = job._soft_timeout
if soft_timeout is None:
diff --git a/t/unit/test_pool.py b/t/unit/test_pool.py
index 71b563b..565e8ec 100644
--- a/t/unit/test_pool.py
+++ b/t/unit/test_pool.py
@@ -8,3 +8,13 @@ def test_raises(self):
assert pool.did_start_ok() is True
pool.close()
pool.terminate()
+
+ def test_timeout_handler_iterates_with_cache(self):
+ # Given a pool
+ pool = billiard.pool.Pool()
+ # If I have a cache containing async results
+ cache = {n: pool.apply_async(n) for n in range(4)}
+ # And a TimeoutHandler with that cache
+ timeout_handler = pool.TimeoutHandler(pool._pool, cache, 0, 0)
+ # If I call to handle the timeouts I expect no exception
+ next(timeout_handler.handle_timeouts())