Blob Blame History Raw
From f9ad391cd2ae32627c506761dc0187a575fd514d Mon Sep 17 00:00:00 2001
From: Greg <gdvalle@users.noreply.github.com>
Date: Sun, 3 Feb 2019 23:07:36 -0600
Subject: [PATCH] Fix warm shutdown crash from timeout handler (#271)

* Fix warm shutdown crash from timeout handler

This is an attempt at fixing #260 while attempting to not break #249.

I was unable to reproduce #249, so this is a bit of a guess, but my
thinking is that we shouldn't need to do a deep copy here because we're
not iterating over the values, just the keys, and a shallow copy
protects us from that.

Also, assuming that the cache keys can change within the loop, this moves
the copy inside the loop, and drops the explicit `list()` since we have
a proper copy already.

* Add TimeoutHandler.handle_timeouts iterate test

Test that we're able to iterate with an ApplyResult, which contain a
thread.Lock that is not pickleable.
---
 billiard/pool.py    |  7 +++++--
 t/unit/test_pool.py | 10 ++++++++++
 2 files changed, 15 insertions(+), 2 deletions(-)

diff --git a/billiard/pool.py b/billiard/pool.py
index 8547728..5f31c57 100644
--- a/billiard/pool.py
+++ b/billiard/pool.py
@@ -694,7 +694,6 @@ def _trywaitkill(self, worker):
             pass
 
     def handle_timeouts(self):
-        cache = copy.deepcopy(self.cache)
         t_hard, t_soft = self.t_hard, self.t_soft
         dirty = set()
         on_soft_timeout = self.on_soft_timeout
@@ -708,12 +707,16 @@ def _timed_out(start, timeout):
 
         # Inner-loop
         while self._state == RUN:
+            # Perform a shallow copy before iteration because keys can change.
+            # A deep copy fails (on shutdown) due to thread.lock objects.
+            # https://github.com/celery/billiard/issues/260
+            cache = copy.copy(self.cache)
 
             # Remove dirty items not in cache anymore
             if dirty:
                 dirty = set(k for k in dirty if k in cache)
 
-            for i, job in list(cache.items()):
+            for i, job in cache.items():
                 ack_time = job._time_accepted
                 soft_timeout = job._soft_timeout
                 if soft_timeout is None:
diff --git a/t/unit/test_pool.py b/t/unit/test_pool.py
index 71b563b..565e8ec 100644
--- a/t/unit/test_pool.py
+++ b/t/unit/test_pool.py
@@ -8,3 +8,13 @@ def test_raises(self):
         assert pool.did_start_ok() is True
         pool.close()
         pool.terminate()
+
+    def test_timeout_handler_iterates_with_cache(self):
+        # Given a pool
+        pool = billiard.pool.Pool()
+        # If I have a cache containing async results
+        cache = {n: pool.apply_async(n) for n in range(4)}
+        # And a TimeoutHandler with that cache
+        timeout_handler = pool.TimeoutHandler(pool._pool, cache, 0, 0)
+        # If I call to handle the timeouts I expect no exception
+        next(timeout_handler.handle_timeouts())