13c43c4
From 134cd670ed5c2773eec8ab6774ea0a6bdb2e7528 Mon Sep 17 00:00:00 2001
13c43c4
From: bbouters <bbouters@redhat.com>
13c43c4
Date: Wed, 3 Aug 2016 19:23:28 +0000
13c43c4
Subject: [PATCH] Fixes Qpid file descriptor leak
13c43c4
13c43c4
This is already fixed in upstream Kombu, so this is a
13c43c4
port of the upstream fix to the version we carry.
13c43c4
13c43c4
The Pulp issue is:
13c43c4
13c43c4
https://pulp.plan.io/issues/2124
13c43c4
---
13c43c4
 tests/transport/test_qpid.py | 48 +++++++++++++++-----------------------------
13c43c4
 transport/qpid.py            | 19 +++++++++---------
13c43c4
 2 files changed, 26 insertions(+), 41 deletions(-)
13c43c4
13c43c4
diff --git a/tests/transport/test_qpid.py b/tests/transport/test_qpid.py
13c43c4
index e9272ff..fb7a831 100644
13c43c4
--- a/kombu/tests/transport/test_qpid.py
13c43c4
+++ b/kombu/tests/transport/test_qpid.py
13c43c4
@@ -1427,21 +1427,6 @@ class TestTransportInit(Case):
13c43c4
         Transport(m)
13c43c4
         self.mock_base_Transport__init__.assert_called_once_with(m)
13c43c4
 
13c43c4
-    def test_transport___init___calls_os_pipe(self):
13c43c4
-        Transport(Mock())
13c43c4
-        self.mock_os.pipe.assert_called_once_with()
13c43c4
-
13c43c4
-    def test_transport___init___saves_os_pipe_file_descriptors(self):
13c43c4
-        transport = Transport(Mock())
13c43c4
-        self.assertIs(transport.r, self.mock_r)
13c43c4
-        self.assertIs(transport._w, self.mock_w)
13c43c4
-
13c43c4
-    def test_transport___init___sets_non_blocking_behavior_on_r_fd(self):
13c43c4
-        Transport(Mock())
13c43c4
-        self.mock_fcntl.fcntl.assert_called_once_with(
13c43c4
-            self.mock_r,  self.mock_fcntl.F_SETFL,  self.mock_os.O_NONBLOCK,
13c43c4
-        )
13c43c4
-
13c43c4
 
13c43c4
 @case_no_python3
13c43c4
 @case_no_pypy
13c43c4
@@ -1813,6 +1798,7 @@ class TestTransportOnReadable(Case):
13c43c4
         self.patch_b = patch.object(Transport, 'drain_events')
13c43c4
         self.mock_drain_events = self.patch_b.start()
13c43c4
         self.transport = Transport(Mock())
13c43c4
+        self.transport.register_with_event_loop(Mock(), Mock())
13c43c4
 
13c43c4
     def tearDown(self):
13c43c4
         self.patch_a.stop()
13c43c4
@@ -1904,25 +1890,23 @@ class TestTransport(ExtraAssertionsMixin, Case):
13c43c4
         result_params = my_transport.default_connection_params
13c43c4
         self.assertDictEqual(correct_params, result_params)
13c43c4
 
13c43c4
-    @patch('os.close')
13c43c4
-    def test_del(self, close):
13c43c4
+    @patch(QPID_MODULE + '.os.close')
13c43c4
+    def test_del_sync(self, close):
13c43c4
+        my_transport = Transport(self.mock_client)
13c43c4
+        my_transport.__del__()
13c43c4
+        self.assertFalse(close.called)
13c43c4
+
13c43c4
+    @patch(QPID_MODULE + '.os.close')
13c43c4
+    def test_del_async(self, close):
13c43c4
         my_transport = Transport(self.mock_client)
13c43c4
+        my_transport.register_with_event_loop(Mock(), Mock())
13c43c4
         my_transport.__del__()
13c43c4
-        self.assertEqual(
13c43c4
-            close.call_args_list,
13c43c4
-            [
13c43c4
-                ((my_transport.r,), {}),
13c43c4
-                ((my_transport._w,), {}),
13c43c4
-            ])
13c43c4
-
13c43c4
-    @patch('os.close')
13c43c4
-    def test_del_failed(self, close):
13c43c4
+        self.assertTrue(close.called)
13c43c4
+
13c43c4
+    @patch(QPID_MODULE + '.os.close')
13c43c4
+    def test_del_async_failed(self, close):
13c43c4
         close.side_effect = OSError()
13c43c4
         my_transport = Transport(self.mock_client)
13c43c4
+        my_transport.register_with_event_loop(Mock(), Mock())
13c43c4
         my_transport.__del__()
13c43c4
-        self.assertEqual(
13c43c4
-            close.call_args_list,
13c43c4
-            [
13c43c4
-                ((my_transport.r,), {}),
13c43c4
-                ((my_transport._w,), {}),
13c43c4
-            ])
13c43c4
+        self.assertTrue(close.called)
13c43c4
diff --git a/transport/qpid.py b/transport/qpid.py
13c43c4
index aa0d8e9..a8e78c4 100644
13c43c4
--- a/kombu/transport/qpid.py
13c43c4
+++ b/kombu/transport/qpid.py
13c43c4
@@ -1393,9 +1393,6 @@ class Transport(base.Transport):
13c43c4
         """
13c43c4
         self.verify_runtime_environment()
13c43c4
         super(Transport, self).__init__(*args, **kwargs)
13c43c4
-        self.r, self._w = os.pipe()
13c43c4
-        if fcntl is not None:
13c43c4
-            fcntl.fcntl(self.r, fcntl.F_SETFL, os.O_NONBLOCK)
13c43c4
         self.use_async_interface = False
13c43c4
 
13c43c4
     def verify_runtime_environment(self):
13c43c4
@@ -1522,6 +1519,9 @@ class Transport(base.Transport):
13c43c4
         :param loop: A reference to the external loop.
13c43c4
         :type loop: kombu.async.hub.Hub
13c43c4
         """
13c43c4
+        self.r, self._w = os.pipe()
13c43c4
+        if fcntl is not None:
13c43c4
+            fcntl.fcntl(self.r, fcntl.F_SETFL, os.O_NONBLOCK)
13c43c4
         self.use_async_interface = True
13c43c4
         loop.add_reader(self.r, self.on_readable, connection, loop)
13c43c4
 
13c43c4
@@ -1691,9 +1691,10 @@ class Transport(base.Transport):
13c43c4
         """
13c43c4
         Ensure file descriptors opened in __init__() are closed.
13c43c4
         """
13c43c4
-        for fd in (self.r, self._w):
13c43c4
-            try:
13c43c4
-                os.close(fd)
13c43c4
-            except OSError:
13c43c4
-                # ignored
13c43c4
-                pass
13c43c4
+        if self.use_async_interface:
13c43c4
+            for fd in (self.r, self._w):
13c43c4
+                try:
13c43c4
+                    os.close(fd)
13c43c4
+                except OSError:
13c43c4
+                    # ignored
13c43c4
+                    pass
13c43c4
-- 
13c43c4
2.7.4
13c43c4