| |
@@ -0,0 +1,75 @@
|
| |
+ From f9ad391cd2ae32627c506761dc0187a575fd514d Mon Sep 17 00:00:00 2001
|
| |
+ From: Greg <gdvalle@users.noreply.github.com>
|
| |
+ Date: Sun, 3 Feb 2019 23:07:36 -0600
|
| |
+ Subject: [PATCH] Fix warm shutdown crash from timeout handler (#271)
|
| |
+
|
| |
+ * Fix warm shutdown crash from timeout handler
|
| |
+
|
| |
+ This is an attempt at fixing #260 while attempting to not break #249.
|
| |
+
|
| |
+ I was unable to reproduce #249, so this is a bit of a guess, but my
|
| |
+ thinking is that we shouldn't need to do a deep copy here because we're
|
| |
+ not iterating over the values, just the keys, and a shallow copy
|
| |
+ protects us from that.
|
| |
+
|
| |
+ Also, assuming that the cache keys can change within the loop, this moves
|
| |
+ the copy inside the loop, and drops the explicit `list()` since we have
|
| |
+ a proper copy already.
|
| |
+
|
| |
+ * Add TimeoutHandler.handle_timeouts iterate test
|
| |
+
|
| |
+ Test that we're able to iterate with an ApplyResult, which contain a
|
| |
+ thread.Lock that is not pickleable.
|
| |
+ ---
|
| |
+ billiard/pool.py | 7 +++++--
|
| |
+ t/unit/test_pool.py | 10 ++++++++++
|
| |
+ 2 files changed, 15 insertions(+), 2 deletions(-)
|
| |
+
|
| |
+ diff --git a/billiard/pool.py b/billiard/pool.py
|
| |
+ index 8547728..5f31c57 100644
|
| |
+ --- a/billiard/pool.py
|
| |
+ +++ b/billiard/pool.py
|
| |
+ @@ -694,7 +694,6 @@ def _trywaitkill(self, worker):
|
| |
+ pass
|
| |
+
|
| |
+ def handle_timeouts(self):
|
| |
+ - cache = copy.deepcopy(self.cache)
|
| |
+ t_hard, t_soft = self.t_hard, self.t_soft
|
| |
+ dirty = set()
|
| |
+ on_soft_timeout = self.on_soft_timeout
|
| |
+ @@ -708,12 +707,16 @@ def _timed_out(start, timeout):
|
| |
+
|
| |
+ # Inner-loop
|
| |
+ while self._state == RUN:
|
| |
+ + # Perform a shallow copy before iteration because keys can change.
|
| |
+ + # A deep copy fails (on shutdown) due to thread.lock objects.
|
| |
+ + # https://github.com/celery/billiard/issues/260
|
| |
+ + cache = copy.copy(self.cache)
|
| |
+
|
| |
+ # Remove dirty items not in cache anymore
|
| |
+ if dirty:
|
| |
+ dirty = set(k for k in dirty if k in cache)
|
| |
+
|
| |
+ - for i, job in list(cache.items()):
|
| |
+ + for i, job in cache.items():
|
| |
+ ack_time = job._time_accepted
|
| |
+ soft_timeout = job._soft_timeout
|
| |
+ if soft_timeout is None:
|
| |
+ diff --git a/t/unit/test_pool.py b/t/unit/test_pool.py
|
| |
+ index 71b563b..565e8ec 100644
|
| |
+ --- a/t/unit/test_pool.py
|
| |
+ +++ b/t/unit/test_pool.py
|
| |
+ @@ -8,3 +8,13 @@ def test_raises(self):
|
| |
+ assert pool.did_start_ok() is True
|
| |
+ pool.close()
|
| |
+ pool.terminate()
|
| |
+ +
|
| |
+ + def test_timeout_handler_iterates_with_cache(self):
|
| |
+ + # Given a pool
|
| |
+ + pool = billiard.pool.Pool()
|
| |
+ + # If I have a cache containing async results
|
| |
+ + cache = {n: pool.apply_async(n) for n in range(4)}
|
| |
+ + # And a TimeoutHandler with that cache
|
| |
+ + timeout_handler = pool.TimeoutHandler(pool._pool, cache, 0, 0)
|
| |
+ + # If I call to handle the timeouts I expect no exception
|
| |
+ + next(timeout_handler.handle_timeouts())
|
| |
Fixes: https://github.com/celery/billiard/issues/260
Alternative solution might be to upgrade to latest upstream version, but that is not compatible with celery version in EPEL 7.
We have tested this patch with Red Hat's internal deployment of odcs and it indeed removes the traceback on shutdown and instead lets celery correctly wait for the tasks to finish.