diff --git a/563.patch b/563.patch new file mode 100644 index 0000000..434cfbd --- /dev/null +++ b/563.patch @@ -0,0 +1,330 @@ +From 5ef3cba9682fd7b12493af6db4628ae2962d6998 Mon Sep 17 00:00:00 2001 +From: Brian Bouterse +Date: Fri, 22 Jan 2016 16:39:13 -0500 +Subject: [PATCH 1/4] @acks_late usage in Qpid Transport now acks all messages + +Implements a workaround for celery/celery#3019 +--- + kombu/transport/qpid.py | 5 +++++ + 1 file changed, 5 insertions(+) + +diff --git a/kombu/transport/qpid.py b/kombu/transport/qpid.py +index 639d837..8b6301e 100644 +--- a/kombu/transport/qpid.py ++++ b/kombu/transport/qpid.py +@@ -74,6 +74,7 @@ Celery with Kombu, this can be accomplished by setting the + from __future__ import absolute_import + + import os ++import random + import select + import socket + import ssl +@@ -938,6 +939,10 @@ class Channel(base.StdChannel): + + def _callback(qpid_message): + raw_message = qpid_message.content ++ ++ # workaround for https://github.com/celery/celery/issues/3019 ++ raw_message['properties']['delivery_tag'] = random.randint(1, 100000000000) ++ + message = self.Message(self, raw_message) + delivery_tag = message.delivery_tag + self.qos.append(qpid_message, delivery_tag) +-- +2.4.3 + + +From f7483a3cde70e488e308132295c23c39ee469092 Mon Sep 17 00:00:00 2001 +From: Brian Bouterse +Date: Mon, 25 Jan 2016 11:43:20 -0500 +Subject: [PATCH 2/4] Revert "@acks_late usage in Qpid Transport now acks all + messages" + +This reverts commit 5ef3cba9682fd7b12493af6db4628ae2962d6998. +--- + kombu/transport/qpid.py | 5 ----- + 1 file changed, 5 deletions(-) + +diff --git a/kombu/transport/qpid.py b/kombu/transport/qpid.py +index 8b6301e..639d837 100644 +--- a/kombu/transport/qpid.py ++++ b/kombu/transport/qpid.py +@@ -74,7 +74,6 @@ Celery with Kombu, this can be accomplished by setting the + from __future__ import absolute_import + + import os +-import random + import select + import socket + import ssl +@@ -939,10 +938,6 @@ class Channel(base.StdChannel): + + def _callback(qpid_message): + raw_message = qpid_message.content +- +- # workaround for https://github.com/celery/celery/issues/3019 +- raw_message['properties']['delivery_tag'] = random.randint(1, 100000000000) +- + message = self.Message(self, raw_message) + delivery_tag = message.delivery_tag + self.qos.append(qpid_message, delivery_tag) +-- +2.4.3 + + +From affa5f5e09c75c660f5ffbafd8aaedc7b8cdae5e Mon Sep 17 00:00:00 2001 +From: Brian Bouterse +Date: Mon, 25 Jan 2016 11:54:45 -0500 +Subject: [PATCH 3/4] @acks_late usage in Qpid Transport now acks all messages + +Implements a workaround for celery/celery#3019 +--- + kombu/tests/transport/test_qpid.py | 4 ---- + kombu/transport/qpid.py | 8 +++----- + 2 files changed, 3 insertions(+), 9 deletions(-) + +diff --git a/kombu/tests/transport/test_qpid.py b/kombu/tests/transport/test_qpid.py +index 4131193..a72077b 100644 +--- a/kombu/tests/transport/test_qpid.py ++++ b/kombu/tests/transport/test_qpid.py +@@ -940,10 +940,6 @@ class TestChannel(ExtraAssertionsMixin, Case): + self.assertIn('base64', Channel.codecs) + self.assertIsInstance(Channel.codecs['base64'], Base64) + +- def test_delivery_tags(self): +- """Test that _delivery_tags is using itertools""" +- self.assertIsInstance(Channel._delivery_tags, count) +- + def test_size(self): + """Test getting the number of messages in a queue specified by + name and returning them.""" +diff --git a/kombu/transport/qpid.py b/kombu/transport/qpid.py +index 639d837..6d9b006 100644 +--- a/kombu/transport/qpid.py ++++ b/kombu/transport/qpid.py +@@ -74,6 +74,7 @@ Celery with Kombu, this can be accomplished by setting the + from __future__ import absolute_import + + import os ++import random + import select + import socket + import ssl +@@ -368,9 +369,6 @@ class Channel(base.StdChannel): + #: Binary <-> ASCII codecs. + codecs = {'base64': Base64()} + +- #: counter used to generate delivery tags for this channel. +- _delivery_tags = count(1) +- + def __init__(self, connection, transport): + """Instantiate a Channel object. + +@@ -1070,7 +1068,7 @@ class Channel(base.StdChannel): + - wraps the body as a buffer object, so that + :class:`qpid.messaging.endpoints.Sender` uses a content type + that can support arbitrarily large messages. +- - assigns a delivery_tag generated through self._delivery_tags ++ - assigns a random delivery_tag + - sets the exchange and routing_key info as delivery_info + + Internally uses :meth:`_put` to send the message synchronously. This +@@ -1096,7 +1094,7 @@ class Channel(base.StdChannel): + props = message['properties'] + props.update( + body_encoding=body_encoding, +- delivery_tag=next(self._delivery_tags), ++ delivery_tag=random.randint(1, sys.maxint), + ) + props['delivery_info'].update( + exchange=exchange, +-- +2.4.3 + + +From 7d6af48c06002deffc135c7fad506909fbf840e6 Mon Sep 17 00:00:00 2001 +From: Brian Bouterse +Date: Mon, 25 Jan 2016 16:07:53 -0500 +Subject: [PATCH 4/4] Switches delivery_tag to uuid.uuid4() for Qpid transport + +celery/kombu#563 +--- + kombu/tests/transport/test_qpid.py | 3 ++- + kombu/transport/qpid.py | 43 +++++++++++++++++--------------------- + 2 files changed, 21 insertions(+), 25 deletions(-) + +diff --git a/kombu/tests/transport/test_qpid.py b/kombu/tests/transport/test_qpid.py +index a72077b..a3894e4 100644 +--- a/kombu/tests/transport/test_qpid.py ++++ b/kombu/tests/transport/test_qpid.py +@@ -5,6 +5,7 @@ import ssl + import socket + import sys + import time ++import uuid + + from collections import Callable + from itertools import count +@@ -1317,7 +1318,7 @@ class TestChannel(ExtraAssertionsMixin, Case): + mock_message['properties']['body_encoding'], mock_body_encoding, + ) + self.assertIsInstance( +- mock_message['properties']['delivery_tag'], int, ++ mock_message['properties']['delivery_tag'], uuid.UUID, + ) + self.assertIs( + mock_message['properties']['delivery_info']['exchange'], +diff --git a/kombu/transport/qpid.py b/kombu/transport/qpid.py +index 6d9b006..b458d32 100644 +--- a/kombu/transport/qpid.py ++++ b/kombu/transport/qpid.py +@@ -74,14 +74,13 @@ Celery with Kombu, this can be accomplished by setting the + from __future__ import absolute_import + + import os +-import random + import select + import socket + import ssl + import sys + import time ++import uuid + +-from itertools import count + from gettext import gettext as _ + + import amqp.protocol +@@ -160,7 +159,7 @@ class QoS(object): + ACKed asynchronously through a call to :meth:`ack`. Messages that are + received, but not ACKed will not be delivered by the broker to another + consumer until an ACK is received, or the session is closed. Messages +- are referred to using delivery_tag integers, which are unique per ++ are referred to using delivery_tag, which are unique per + :class:`Channel`. Delivery tags are managed outside of this object and + are passed in with a message to :meth:`append`. Un-ACKed messages can + be looked up from QoS using :meth:`get` and can be rejected and +@@ -214,15 +213,15 @@ class QoS(object): + def append(self, message, delivery_tag): + """Append message to the list of un-ACKed messages. + +- Add a message, referenced by the integer delivery_tag, for ACKing, ++ Add a message, referenced by the delivery_tag, for ACKing, + rejecting, or getting later. Messages are saved into an + :class:`~kombu.utils.compat.OrderedDict` by delivery_tag. + + :param message: A received message that has not yet been ACKed. + :type message: qpid.messaging.Message +- :param delivery_tag: An integer number to refer to this message by ++ :param delivery_tag: A UUID to refer to this message by + upon receipt. +- :type delivery_tag: int ++ :type delivery_tag: uuid.UUID + """ + self._not_yet_acked[delivery_tag] = message + +@@ -233,7 +232,7 @@ class QoS(object): + + :param delivery_tag: The delivery tag associated with the message + to be returned. +- :type delivery_tag: int ++ :type delivery_tag: uuid.UUID + + :return: An un-ACKed message that is looked up by delivery_tag. + :rtype: qpid.messaging.Message +@@ -248,7 +247,7 @@ class QoS(object): + + :param delivery_tag: the delivery tag associated with the message + to be acknowledged. +- :type delivery_tag: int ++ :type delivery_tag: uuid.UUID + """ + message = self._not_yet_acked.pop(delivery_tag) + self.session.acknowledge(message=message) +@@ -266,7 +265,7 @@ class QoS(object): + + :param delivery_tag: The delivery tag associated with the message + to be rejected. +- :type delivery_tag: int ++ :type delivery_tag: uuid.UUID + :keyword requeue: If True, the broker will be notified to requeue + the message. If False, the broker will be told to drop the + message entirely. In both cases, the message will be removed +@@ -311,10 +310,9 @@ class Channel(base.StdChannel): + Messages sent using this Channel are assigned a delivery_tag. The + delivery_tag is generated for a message as they are prepared for + sending by :meth:`basic_publish`. The delivery_tag is unique per +- Channel instance using :meth:`~itertools.count`. The delivery_tag has +- no meaningful context in other objects, and is only maintained in the +- memory of this object, and the underlying :class:`QoS` object that +- provides support. ++ Channel instance. The delivery_tag has no meaningful context in other ++ objects, and is only maintained in the memory of this object, and the ++ underlying :class:`QoS` object that provides support. + + Each Channel object instantiates exactly one :class:`QoS` object for + prefetch limiting, and asynchronous ACKing. The :class:`QoS` object is +@@ -842,7 +840,7 @@ class Channel(base.StdChannel): + + :param delivery_tag: The delivery tag associated with the message + to be acknowledged. +- :type delivery_tag: int ++ :type delivery_tag: uuid.UUID + """ + self.qos.ack(delivery_tag) + +@@ -860,7 +858,7 @@ class Channel(base.StdChannel): + + :param delivery_tag: The delivery tag associated with the message + to be rejected. +- :type delivery_tag: int ++ :type delivery_tag: uuid.UUID + :keyword requeue: If False, the rejected message will be dropped by + the broker and not delivered to any other consumers. If True, + then the rejected message will be requeued for delivery to +@@ -901,10 +899,9 @@ class Channel(base.StdChannel): + handled by the caller of :meth:`~Transport.drain_events`. Messages + can be ACKed after being received through a call to :meth:`basic_ack`. + +- If no_ack is True, the messages are immediately ACKed to avoid a +- memory leak in qpid.messaging when messages go un-ACKed. The no_ack +- flag indicates that the receiver of the message does not intent to +- call :meth:`basic_ack`. ++ If no_ack is True, The no_ack flag indicates that the receiver of ++ the message will not call :meth:`basic_ack` later. Since the ++ message will not be ACKed later, it is ACKed immediately. + + :meth:`basic_consume` transforms the message object type prior to + calling the callback. Initially the message comes in as a +@@ -940,9 +937,7 @@ class Channel(base.StdChannel): + delivery_tag = message.delivery_tag + self.qos.append(qpid_message, delivery_tag) + if no_ack: +- # Celery will not ack this message later, so we should to +- # avoid a memory leak in qpid.messaging due to un-ACKed +- # messages. ++ # Celery will not ack this message later, so we should ack now + self.basic_ack(delivery_tag) + return callback(message) + +@@ -1068,7 +1063,7 @@ class Channel(base.StdChannel): + - wraps the body as a buffer object, so that + :class:`qpid.messaging.endpoints.Sender` uses a content type + that can support arbitrarily large messages. +- - assigns a random delivery_tag ++ - sets delivery_tag to a random uuid.UUID + - sets the exchange and routing_key info as delivery_info + + Internally uses :meth:`_put` to send the message synchronously. This +@@ -1094,7 +1089,7 @@ class Channel(base.StdChannel): + props = message['properties'] + props.update( + body_encoding=body_encoding, +- delivery_tag=random.randint(1, sys.maxint), ++ delivery_tag=uuid.uuid4(), + ) + props['delivery_info'].update( + exchange=exchange, +-- +2.4.3 + diff --git a/python-kombu.spec b/python-kombu.spec index 1b76544..4d38304 100644 --- a/python-kombu.spec +++ b/python-kombu.spec @@ -6,7 +6,7 @@ Name: python-%{srcname} Version: 3.0.33 -Release: 1%{?dist} +Release: 2%{?dist} Epoch: 1 Summary: An AMQP Messaging Framework for Python @@ -15,6 +15,7 @@ Group: Development/Languages License: BSD and Python URL: http://pypi.python.org/pypi/%{srcname} Source0: http://pypi.python.org/packages/source/k/%{srcname}/%{srcname}-%{version}.tar.gz +Patch0: 563.patch BuildArch: noarch BuildRequires: python2-devel @@ -90,6 +91,8 @@ This subpackage is for python3 %prep %setup -q -n %{srcname}-%{version} +%patch0 -p1 + %if 0%{?with_python3} cp -a . %{py3dir} %endif @@ -134,6 +137,9 @@ sed -i 's!/usr/bin/python$!/usr/bin/python3!' %{buildroot}/%{python3_sitelib}/ko %endif %changelog +* Wed Jan 27 2016 Brian Bouterse - 1:3.0.33-2 +- Adds patch to fix upstream issue 563 (rhbz#1300811) + * Wed Jan 20 2016 Brian Bouterse - 1:3.0.33-1 - Update to latest upstream version 3.0.33 (rhbz#1297116)