From acfaf15d5e53b3043809092d5151700a5d27807c Mon Sep 17 00:00:00 2001 From: Brian Bouterse Date: Feb 26 2017 22:45:09 +0000 Subject: Adds upstream patch for 699 preventing the Qpid transport from working with Celery 4 --- diff --git a/2124.patch b/2124.patch deleted file mode 100644 index eab2b89..0000000 --- a/2124.patch +++ /dev/null @@ -1,135 +0,0 @@ -From 134cd670ed5c2773eec8ab6774ea0a6bdb2e7528 Mon Sep 17 00:00:00 2001 -From: bbouters -Date: Wed, 3 Aug 2016 19:23:28 +0000 -Subject: [PATCH] Fixes Qpid file descriptor leak - -This is already fixed in upstream Kombu, so this is a -port of the upstream fix to the version we carry. - -The Pulp issue is: - -https://pulp.plan.io/issues/2124 ---- - tests/transport/test_qpid.py | 48 +++++++++++++++----------------------------- - transport/qpid.py | 19 +++++++++--------- - 2 files changed, 26 insertions(+), 41 deletions(-) - -diff --git a/tests/transport/test_qpid.py b/tests/transport/test_qpid.py -index e9272ff..fb7a831 100644 ---- a/kombu/tests/transport/test_qpid.py -+++ b/kombu/tests/transport/test_qpid.py -@@ -1427,21 +1427,6 @@ class TestTransportInit(Case): - Transport(m) - self.mock_base_Transport__init__.assert_called_once_with(m) - -- def test_transport___init___calls_os_pipe(self): -- Transport(Mock()) -- self.mock_os.pipe.assert_called_once_with() -- -- def test_transport___init___saves_os_pipe_file_descriptors(self): -- transport = Transport(Mock()) -- self.assertIs(transport.r, self.mock_r) -- self.assertIs(transport._w, self.mock_w) -- -- def test_transport___init___sets_non_blocking_behavior_on_r_fd(self): -- Transport(Mock()) -- self.mock_fcntl.fcntl.assert_called_once_with( -- self.mock_r, self.mock_fcntl.F_SETFL, self.mock_os.O_NONBLOCK, -- ) -- - - @case_no_python3 - @case_no_pypy -@@ -1813,6 +1798,7 @@ class TestTransportOnReadable(Case): - self.patch_b = patch.object(Transport, 'drain_events') - self.mock_drain_events = self.patch_b.start() - self.transport = Transport(Mock()) -+ self.transport.register_with_event_loop(Mock(), Mock()) - - def tearDown(self): - self.patch_a.stop() -@@ -1904,25 +1890,23 @@ class TestTransport(ExtraAssertionsMixin, Case): - result_params = my_transport.default_connection_params - self.assertDictEqual(correct_params, result_params) - -- @patch('os.close') -- def test_del(self, close): -+ @patch(QPID_MODULE + '.os.close') -+ def test_del_sync(self, close): -+ my_transport = Transport(self.mock_client) -+ my_transport.__del__() -+ self.assertFalse(close.called) -+ -+ @patch(QPID_MODULE + '.os.close') -+ def test_del_async(self, close): - my_transport = Transport(self.mock_client) -+ my_transport.register_with_event_loop(Mock(), Mock()) - my_transport.__del__() -- self.assertEqual( -- close.call_args_list, -- [ -- ((my_transport.r,), {}), -- ((my_transport._w,), {}), -- ]) -- -- @patch('os.close') -- def test_del_failed(self, close): -+ self.assertTrue(close.called) -+ -+ @patch(QPID_MODULE + '.os.close') -+ def test_del_async_failed(self, close): - close.side_effect = OSError() - my_transport = Transport(self.mock_client) -+ my_transport.register_with_event_loop(Mock(), Mock()) - my_transport.__del__() -- self.assertEqual( -- close.call_args_list, -- [ -- ((my_transport.r,), {}), -- ((my_transport._w,), {}), -- ]) -+ self.assertTrue(close.called) -diff --git a/transport/qpid.py b/transport/qpid.py -index aa0d8e9..a8e78c4 100644 ---- a/kombu/transport/qpid.py -+++ b/kombu/transport/qpid.py -@@ -1393,9 +1393,6 @@ class Transport(base.Transport): - """ - self.verify_runtime_environment() - super(Transport, self).__init__(*args, **kwargs) -- self.r, self._w = os.pipe() -- if fcntl is not None: -- fcntl.fcntl(self.r, fcntl.F_SETFL, os.O_NONBLOCK) - self.use_async_interface = False - - def verify_runtime_environment(self): -@@ -1522,6 +1519,9 @@ class Transport(base.Transport): - :param loop: A reference to the external loop. - :type loop: kombu.async.hub.Hub - """ -+ self.r, self._w = os.pipe() -+ if fcntl is not None: -+ fcntl.fcntl(self.r, fcntl.F_SETFL, os.O_NONBLOCK) - self.use_async_interface = True - loop.add_reader(self.r, self.on_readable, connection, loop) - -@@ -1691,9 +1691,10 @@ class Transport(base.Transport): - """ - Ensure file descriptors opened in __init__() are closed. - """ -- for fd in (self.r, self._w): -- try: -- os.close(fd) -- except OSError: -- # ignored -- pass -+ if self.use_async_interface: -+ for fd in (self.r, self._w): -+ try: -+ os.close(fd) -+ except OSError: -+ # ignored -+ pass --- -2.7.4 - diff --git a/563.patch b/563.patch deleted file mode 100644 index 434cfbd..0000000 --- a/563.patch +++ /dev/null @@ -1,330 +0,0 @@ -From 5ef3cba9682fd7b12493af6db4628ae2962d6998 Mon Sep 17 00:00:00 2001 -From: Brian Bouterse -Date: Fri, 22 Jan 2016 16:39:13 -0500 -Subject: [PATCH 1/4] @acks_late usage in Qpid Transport now acks all messages - -Implements a workaround for celery/celery#3019 ---- - kombu/transport/qpid.py | 5 +++++ - 1 file changed, 5 insertions(+) - -diff --git a/kombu/transport/qpid.py b/kombu/transport/qpid.py -index 639d837..8b6301e 100644 ---- a/kombu/transport/qpid.py -+++ b/kombu/transport/qpid.py -@@ -74,6 +74,7 @@ Celery with Kombu, this can be accomplished by setting the - from __future__ import absolute_import - - import os -+import random - import select - import socket - import ssl -@@ -938,6 +939,10 @@ class Channel(base.StdChannel): - - def _callback(qpid_message): - raw_message = qpid_message.content -+ -+ # workaround for https://github.com/celery/celery/issues/3019 -+ raw_message['properties']['delivery_tag'] = random.randint(1, 100000000000) -+ - message = self.Message(self, raw_message) - delivery_tag = message.delivery_tag - self.qos.append(qpid_message, delivery_tag) --- -2.4.3 - - -From f7483a3cde70e488e308132295c23c39ee469092 Mon Sep 17 00:00:00 2001 -From: Brian Bouterse -Date: Mon, 25 Jan 2016 11:43:20 -0500 -Subject: [PATCH 2/4] Revert "@acks_late usage in Qpid Transport now acks all - messages" - -This reverts commit 5ef3cba9682fd7b12493af6db4628ae2962d6998. ---- - kombu/transport/qpid.py | 5 ----- - 1 file changed, 5 deletions(-) - -diff --git a/kombu/transport/qpid.py b/kombu/transport/qpid.py -index 8b6301e..639d837 100644 ---- a/kombu/transport/qpid.py -+++ b/kombu/transport/qpid.py -@@ -74,7 +74,6 @@ Celery with Kombu, this can be accomplished by setting the - from __future__ import absolute_import - - import os --import random - import select - import socket - import ssl -@@ -939,10 +938,6 @@ class Channel(base.StdChannel): - - def _callback(qpid_message): - raw_message = qpid_message.content -- -- # workaround for https://github.com/celery/celery/issues/3019 -- raw_message['properties']['delivery_tag'] = random.randint(1, 100000000000) -- - message = self.Message(self, raw_message) - delivery_tag = message.delivery_tag - self.qos.append(qpid_message, delivery_tag) --- -2.4.3 - - -From affa5f5e09c75c660f5ffbafd8aaedc7b8cdae5e Mon Sep 17 00:00:00 2001 -From: Brian Bouterse -Date: Mon, 25 Jan 2016 11:54:45 -0500 -Subject: [PATCH 3/4] @acks_late usage in Qpid Transport now acks all messages - -Implements a workaround for celery/celery#3019 ---- - kombu/tests/transport/test_qpid.py | 4 ---- - kombu/transport/qpid.py | 8 +++----- - 2 files changed, 3 insertions(+), 9 deletions(-) - -diff --git a/kombu/tests/transport/test_qpid.py b/kombu/tests/transport/test_qpid.py -index 4131193..a72077b 100644 ---- a/kombu/tests/transport/test_qpid.py -+++ b/kombu/tests/transport/test_qpid.py -@@ -940,10 +940,6 @@ class TestChannel(ExtraAssertionsMixin, Case): - self.assertIn('base64', Channel.codecs) - self.assertIsInstance(Channel.codecs['base64'], Base64) - -- def test_delivery_tags(self): -- """Test that _delivery_tags is using itertools""" -- self.assertIsInstance(Channel._delivery_tags, count) -- - def test_size(self): - """Test getting the number of messages in a queue specified by - name and returning them.""" -diff --git a/kombu/transport/qpid.py b/kombu/transport/qpid.py -index 639d837..6d9b006 100644 ---- a/kombu/transport/qpid.py -+++ b/kombu/transport/qpid.py -@@ -74,6 +74,7 @@ Celery with Kombu, this can be accomplished by setting the - from __future__ import absolute_import - - import os -+import random - import select - import socket - import ssl -@@ -368,9 +369,6 @@ class Channel(base.StdChannel): - #: Binary <-> ASCII codecs. - codecs = {'base64': Base64()} - -- #: counter used to generate delivery tags for this channel. -- _delivery_tags = count(1) -- - def __init__(self, connection, transport): - """Instantiate a Channel object. - -@@ -1070,7 +1068,7 @@ class Channel(base.StdChannel): - - wraps the body as a buffer object, so that - :class:`qpid.messaging.endpoints.Sender` uses a content type - that can support arbitrarily large messages. -- - assigns a delivery_tag generated through self._delivery_tags -+ - assigns a random delivery_tag - - sets the exchange and routing_key info as delivery_info - - Internally uses :meth:`_put` to send the message synchronously. This -@@ -1096,7 +1094,7 @@ class Channel(base.StdChannel): - props = message['properties'] - props.update( - body_encoding=body_encoding, -- delivery_tag=next(self._delivery_tags), -+ delivery_tag=random.randint(1, sys.maxint), - ) - props['delivery_info'].update( - exchange=exchange, --- -2.4.3 - - -From 7d6af48c06002deffc135c7fad506909fbf840e6 Mon Sep 17 00:00:00 2001 -From: Brian Bouterse -Date: Mon, 25 Jan 2016 16:07:53 -0500 -Subject: [PATCH 4/4] Switches delivery_tag to uuid.uuid4() for Qpid transport - -celery/kombu#563 ---- - kombu/tests/transport/test_qpid.py | 3 ++- - kombu/transport/qpid.py | 43 +++++++++++++++++--------------------- - 2 files changed, 21 insertions(+), 25 deletions(-) - -diff --git a/kombu/tests/transport/test_qpid.py b/kombu/tests/transport/test_qpid.py -index a72077b..a3894e4 100644 ---- a/kombu/tests/transport/test_qpid.py -+++ b/kombu/tests/transport/test_qpid.py -@@ -5,6 +5,7 @@ import ssl - import socket - import sys - import time -+import uuid - - from collections import Callable - from itertools import count -@@ -1317,7 +1318,7 @@ class TestChannel(ExtraAssertionsMixin, Case): - mock_message['properties']['body_encoding'], mock_body_encoding, - ) - self.assertIsInstance( -- mock_message['properties']['delivery_tag'], int, -+ mock_message['properties']['delivery_tag'], uuid.UUID, - ) - self.assertIs( - mock_message['properties']['delivery_info']['exchange'], -diff --git a/kombu/transport/qpid.py b/kombu/transport/qpid.py -index 6d9b006..b458d32 100644 ---- a/kombu/transport/qpid.py -+++ b/kombu/transport/qpid.py -@@ -74,14 +74,13 @@ Celery with Kombu, this can be accomplished by setting the - from __future__ import absolute_import - - import os --import random - import select - import socket - import ssl - import sys - import time -+import uuid - --from itertools import count - from gettext import gettext as _ - - import amqp.protocol -@@ -160,7 +159,7 @@ class QoS(object): - ACKed asynchronously through a call to :meth:`ack`. Messages that are - received, but not ACKed will not be delivered by the broker to another - consumer until an ACK is received, or the session is closed. Messages -- are referred to using delivery_tag integers, which are unique per -+ are referred to using delivery_tag, which are unique per - :class:`Channel`. Delivery tags are managed outside of this object and - are passed in with a message to :meth:`append`. Un-ACKed messages can - be looked up from QoS using :meth:`get` and can be rejected and -@@ -214,15 +213,15 @@ class QoS(object): - def append(self, message, delivery_tag): - """Append message to the list of un-ACKed messages. - -- Add a message, referenced by the integer delivery_tag, for ACKing, -+ Add a message, referenced by the delivery_tag, for ACKing, - rejecting, or getting later. Messages are saved into an - :class:`~kombu.utils.compat.OrderedDict` by delivery_tag. - - :param message: A received message that has not yet been ACKed. - :type message: qpid.messaging.Message -- :param delivery_tag: An integer number to refer to this message by -+ :param delivery_tag: A UUID to refer to this message by - upon receipt. -- :type delivery_tag: int -+ :type delivery_tag: uuid.UUID - """ - self._not_yet_acked[delivery_tag] = message - -@@ -233,7 +232,7 @@ class QoS(object): - - :param delivery_tag: The delivery tag associated with the message - to be returned. -- :type delivery_tag: int -+ :type delivery_tag: uuid.UUID - - :return: An un-ACKed message that is looked up by delivery_tag. - :rtype: qpid.messaging.Message -@@ -248,7 +247,7 @@ class QoS(object): - - :param delivery_tag: the delivery tag associated with the message - to be acknowledged. -- :type delivery_tag: int -+ :type delivery_tag: uuid.UUID - """ - message = self._not_yet_acked.pop(delivery_tag) - self.session.acknowledge(message=message) -@@ -266,7 +265,7 @@ class QoS(object): - - :param delivery_tag: The delivery tag associated with the message - to be rejected. -- :type delivery_tag: int -+ :type delivery_tag: uuid.UUID - :keyword requeue: If True, the broker will be notified to requeue - the message. If False, the broker will be told to drop the - message entirely. In both cases, the message will be removed -@@ -311,10 +310,9 @@ class Channel(base.StdChannel): - Messages sent using this Channel are assigned a delivery_tag. The - delivery_tag is generated for a message as they are prepared for - sending by :meth:`basic_publish`. The delivery_tag is unique per -- Channel instance using :meth:`~itertools.count`. The delivery_tag has -- no meaningful context in other objects, and is only maintained in the -- memory of this object, and the underlying :class:`QoS` object that -- provides support. -+ Channel instance. The delivery_tag has no meaningful context in other -+ objects, and is only maintained in the memory of this object, and the -+ underlying :class:`QoS` object that provides support. - - Each Channel object instantiates exactly one :class:`QoS` object for - prefetch limiting, and asynchronous ACKing. The :class:`QoS` object is -@@ -842,7 +840,7 @@ class Channel(base.StdChannel): - - :param delivery_tag: The delivery tag associated with the message - to be acknowledged. -- :type delivery_tag: int -+ :type delivery_tag: uuid.UUID - """ - self.qos.ack(delivery_tag) - -@@ -860,7 +858,7 @@ class Channel(base.StdChannel): - - :param delivery_tag: The delivery tag associated with the message - to be rejected. -- :type delivery_tag: int -+ :type delivery_tag: uuid.UUID - :keyword requeue: If False, the rejected message will be dropped by - the broker and not delivered to any other consumers. If True, - then the rejected message will be requeued for delivery to -@@ -901,10 +899,9 @@ class Channel(base.StdChannel): - handled by the caller of :meth:`~Transport.drain_events`. Messages - can be ACKed after being received through a call to :meth:`basic_ack`. - -- If no_ack is True, the messages are immediately ACKed to avoid a -- memory leak in qpid.messaging when messages go un-ACKed. The no_ack -- flag indicates that the receiver of the message does not intent to -- call :meth:`basic_ack`. -+ If no_ack is True, The no_ack flag indicates that the receiver of -+ the message will not call :meth:`basic_ack` later. Since the -+ message will not be ACKed later, it is ACKed immediately. - - :meth:`basic_consume` transforms the message object type prior to - calling the callback. Initially the message comes in as a -@@ -940,9 +937,7 @@ class Channel(base.StdChannel): - delivery_tag = message.delivery_tag - self.qos.append(qpid_message, delivery_tag) - if no_ack: -- # Celery will not ack this message later, so we should to -- # avoid a memory leak in qpid.messaging due to un-ACKed -- # messages. -+ # Celery will not ack this message later, so we should ack now - self.basic_ack(delivery_tag) - return callback(message) - -@@ -1068,7 +1063,7 @@ class Channel(base.StdChannel): - - wraps the body as a buffer object, so that - :class:`qpid.messaging.endpoints.Sender` uses a content type - that can support arbitrarily large messages. -- - assigns a random delivery_tag -+ - sets delivery_tag to a random uuid.UUID - - sets the exchange and routing_key info as delivery_info - - Internally uses :meth:`_put` to send the message synchronously. This -@@ -1094,7 +1089,7 @@ class Channel(base.StdChannel): - props = message['properties'] - props.update( - body_encoding=body_encoding, -- delivery_tag=random.randint(1, sys.maxint), -+ delivery_tag=uuid.uuid4(), - ) - props['delivery_info'].update( - exchange=exchange, --- -2.4.3 - diff --git a/569.patch b/569.patch deleted file mode 100644 index 45d890f..0000000 --- a/569.patch +++ /dev/null @@ -1,37 +0,0 @@ -From 6115b1a9be4de41f2c7cbb855405bfd60eff81fc Mon Sep 17 00:00:00 2001 -From: Brian Bouterse -Date: Tue, 9 Feb 2016 14:37:09 -0500 -Subject: [PATCH] Adds asynchronous error handling to Qpid transport - -Fixes #568 ---- - kombu/transport/qpid.py | 9 +++++++++ - 1 file changed, 9 insertions(+) - -diff --git a/kombu/transport/qpid.py b/kombu/transport/qpid.py -index b458d32..081c6c6 100644 ---- a/kombu/transport/qpid.py -+++ b/kombu/transport/qpid.py -@@ -1437,6 +1437,9 @@ def verify_runtime_environment(self): - def _qpid_session_ready(self): - os.write(self._w, '0') - -+ def _qpid_exception(self, obj_with_exception): -+ os.write(self._w, 'e') -+ - def on_readable(self, connection, loop): - """Handle any messages associated with this Transport. - -@@ -1594,6 +1597,12 @@ def establish_connection(self): - conn.client = self.client - self.session = conn.get_qpid_connection().session() - self.session.set_message_received_handler(self._qpid_session_ready) -+ conn.get_qpid_connection().set_exception_notify_handler( -+ self._qpid_exception -+ ) -+ self.session.set_exception_notify_handler( -+ self._qpid_exception -+ ) - return conn - - def close_connection(self, connection): diff --git a/571.patch b/571.patch deleted file mode 100644 index 26e9357..0000000 --- a/571.patch +++ /dev/null @@ -1,114 +0,0 @@ -From e4149280e2e72e73985fdd7e040c833fc61a88a2 Mon Sep 17 00:00:00 2001 -From: Brian Bouterse -Date: Tue, 16 Feb 2016 14:56:23 -0500 -Subject: [PATCH] Updates Qpid transport with updated names of Qpid async - interface - ---- - kombu/tests/transport/test_qpid.py | 38 +++++++++++++++++++++++++++++--------- - kombu/transport/qpid.py | 16 +++++++++------- - 2 files changed, 38 insertions(+), 16 deletions(-) - -diff --git a/kombu/tests/transport/test_qpid.py b/kombu/tests/transport/test_qpid.py -index a3894e4..4340443 100644 ---- a/kombu/tests/transport/test_qpid.py -+++ b/kombu/tests/transport/test_qpid.py -@@ -1706,13 +1706,29 @@ def test_transport_establish_conn_uses_hostname_if_not_default(self): - transport='tcp', - ) - -- def test_transport_sets_qpid_message_received_handler(self): -+ def test_transport_sets_qpid_message_ready_handler(self): - self.transport.establish_connection() -- qpid_conn = self.mock_conn.return_value.get_qpid_connection -- new_mock_session = qpid_conn.return_value.session.return_value -- mock_set_callback = new_mock_session.set_message_received_handler -- expected_callback = self.transport._qpid_session_ready -- mock_set_callback.assert_called_once_with(expected_callback) -+ qpid_conn_call = self.mock_conn.return_value.get_qpid_connection -+ mock_session = qpid_conn_call.return_value.session.return_value -+ mock_set_callback = mock_session.set_message_received_notify_handler -+ expected_msg_callback = self.transport._qpid_message_ready_handler -+ mock_set_callback.assert_called_once_with(expected_msg_callback) -+ -+ def test_transport_sets_session_exception_handler(self): -+ self.transport.establish_connection() -+ qpid_conn_call = self.mock_conn.return_value.get_qpid_connection -+ mock_session = qpid_conn_call.return_value.session.return_value -+ mock_set_callback = mock_session.set_async_exception_notify_handler -+ exc_callback = self.transport._qpid_async_exception_notify_handler -+ mock_set_callback.assert_called_once_with(exc_callback) -+ -+ def test_transport_sets_connection_exception_handler(self): -+ self.transport.establish_connection() -+ qpid_conn_call = self.mock_conn.return_value.get_qpid_connection -+ qpid_conn = qpid_conn_call.return_value -+ mock_set_callback = qpid_conn.set_async_exception_notify_handler -+ exc_callback = self.transport._qpid_async_exception_notify_handler -+ mock_set_callback.assert_called_once_with(exc_callback) - - - @case_no_python3 -@@ -1766,7 +1782,7 @@ def test_transport_register_with_event_loop_calls_add_reader(self): - @case_no_python3 - @case_no_pypy - @disable_runtime_dependency_check --class TestTransportQpidSessionReady(Case): -+class TestTransportQpidCallbackHandlers(Case): - - def setUp(self): - self.patch_a = patch(QPID_MODULE + '.os.write') -@@ -1776,10 +1792,14 @@ def setUp(self): - def tearDown(self): - self.patch_a.stop() - -- def test_transport__qpid_session_ready_writes_symbol_to_fd(self): -- self.transport._qpid_session_ready() -+ def test__qpid_message_ready_handler_writes_symbol_to_fd(self): -+ self.transport._qpid_message_ready_handler(Mock()) - self.mock_os_write.assert_called_once_with(self.transport._w, '0') - -+ def test__qpid_async_exception_notify_handler_writes_symbol_to_fd(self): -+ self.transport._qpid_async_exception_notify_handler(Mock(), Mock()) -+ self.mock_os_write.assert_called_once_with(self.transport._w, 'e') -+ - - @case_no_python3 - @case_no_pypy -diff --git a/kombu/transport/qpid.py b/kombu/transport/qpid.py -index 081c6c6..55cafdd 100644 ---- a/kombu/transport/qpid.py -+++ b/kombu/transport/qpid.py -@@ -1434,10 +1434,10 @@ def verify_runtime_environment(self): - 'with your package manager. You can also try `pip install ' - 'qpid-python`.') - -- def _qpid_session_ready(self): -+ def _qpid_message_ready_handler(self, session): - os.write(self._w, '0') - -- def _qpid_exception(self, obj_with_exception): -+ def _qpid_async_exception_notify_handler(self, obj_with_exception, exc): - os.write(self._w, 'e') - - def on_readable(self, connection, loop): -@@ -1596,12 +1596,14 @@ def establish_connection(self): - conn = self.Connection(**opts) - conn.client = self.client - self.session = conn.get_qpid_connection().session() -- self.session.set_message_received_handler(self._qpid_session_ready) -- conn.get_qpid_connection().set_exception_notify_handler( -- self._qpid_exception -+ self.session.set_message_received_notify_handler( -+ self._qpid_message_ready_handler - ) -- self.session.set_exception_notify_handler( -- self._qpid_exception -+ conn.get_qpid_connection().set_async_exception_notify_handler( -+ self._qpid_async_exception_notify_handler -+ ) -+ self.session.set_async_exception_notify_handler( -+ self._qpid_async_exception_notify_handler - ) - return conn - diff --git a/577.patch b/577.patch deleted file mode 100644 index eb3a66a..0000000 --- a/577.patch +++ /dev/null @@ -1,43 +0,0 @@ ---- /kombu/transport/qpid.py 2016-04-01 16:59:49.698901199 -0400 -+++ /kombu/transport/qpid.py.modified 2016-04-01 16:59:43.643873453 -0400 -@@ -1396,6 +1396,7 @@ - self.r, self._w = os.pipe() - if fcntl is not None: - fcntl.fcntl(self.r, fcntl.F_SETFL, os.O_NONBLOCK) -+ self.use_async_interface = False - - def verify_runtime_environment(self): - """Verify that the runtime environment is acceptable. -@@ -1435,10 +1436,12 @@ - 'qpid-python`.') - - def _qpid_message_ready_handler(self, session): -- os.write(self._w, '0') -+ if self.use_async_interface: -+ os.write(self._w, '0') - - def _qpid_async_exception_notify_handler(self, obj_with_exception, exc): -- os.write(self._w, 'e') -+ if self.use_async_interface: -+ os.write(self._w, 'e') - - def on_readable(self, connection, loop): - """Handle any messages associated with this Transport. -@@ -1519,6 +1522,7 @@ - :param loop: A reference to the external loop. - :type loop: kombu.async.hub.Hub - """ -+ self.use_async_interface = True - loop.add_reader(self.r, self.on_readable, connection, loop) - - def establish_connection(self): ---- /kombu/tests/transport/test_qpid.py 2016-04-01 17:10:59.140637210 -0400 -+++ /kombu/tests/transport/test_qpid.py.modified 2016-04-01 17:13:40.708274142 -0400 -@@ -1788,6 +1788,7 @@ - self.patch_a = patch(QPID_MODULE + '.os.write') - self.mock_os_write = self.patch_a.start() - self.transport = Transport(Mock()) -+ self.transport.register_with_event_loop(Mock(), Mock()) - - def tearDown(self): - self.patch_a.stop() diff --git a/python-kombu.spec b/python-kombu.spec index ae28dbc..99a2ee3 100644 --- a/python-kombu.spec +++ b/python-kombu.spec @@ -6,7 +6,7 @@ Name: python-%{srcname} Version: 4.0.2 -Release: 3%{?dist} +Release: 4%{?dist} Epoch: 1 Summary: An AMQP Messaging Framework for Python @@ -15,6 +15,9 @@ License: BSD and Python URL: http://kombu.readthedocs.org/ Source0: https://github.com/celery/kombu/archive/v%{version}.tar.gz#/%{srcname}-%{version}.tar.gz +# This can be removed in 4.0.3+ +Patch0: qpid-transport-works-with-celery-4.patch + BuildArch: noarch BuildRequires: python2-devel @@ -97,7 +100,7 @@ also provide proven and tested solutions to common messaging problems. %endif %prep -%autosetup -n %{srcname}-%{version} +%autosetup -n %{srcname}-%{version} -p1 %build %py2_build @@ -131,6 +134,9 @@ also provide proven and tested solutions to common messaging problems. %endif %changelog +* Sun Feb 26 2017 Brian Bouterse - 1:4.0.2-4 +- Adds upstream patch for 699 preventing the Qpid transport from working with Celery 4 + * Sat Feb 11 2017 Fedora Release Engineering - 1:4.0.2-3 - Rebuilt for https://fedoraproject.org/wiki/Fedora_26_Mass_Rebuild diff --git a/qpid-transport-works-with-celery-4.patch b/qpid-transport-works-with-celery-4.patch new file mode 100644 index 0000000..bf527f1 --- /dev/null +++ b/qpid-transport-works-with-celery-4.patch @@ -0,0 +1,26 @@ +diff --git a/kombu/transport/qpid.py b/kombu/transport/qpid.py +index 2204624..35da356 100644 +--- a/kombu/transport/qpid.py ++++ b/kombu/transport/qpid.py +@@ -850,7 +850,7 @@ def basic_get(self, queue, no_ack=False, **kwargs): + except Empty: + pass + +- def basic_ack(self, delivery_tag): ++ def basic_ack(self, delivery_tag, multiple=False): + """Acknowledge a message by delivery_tag. + + Acknowledges a message referenced by delivery_tag. Messages can +@@ -864,8 +864,12 @@ def basic_ack(self, delivery_tag): + :param delivery_tag: The delivery tag associated with the message + to be acknowledged. + :type delivery_tag: uuid.UUID ++ :param multiple: not implemented. If set to True an AssertionError ++ is raised. ++ :type multiple: bool + + """ ++ assert multiple is False + self.qos.ack(delivery_tag) + + def basic_reject(self, delivery_tag, requeue=False):