Blob Blame History Raw
From 82d7bdc584e911385e286977c150209655af481c Mon Sep 17 00:00:00 2001
From: Armin Le Grand <Armin.Le.Grand@cib.de>
Date: Wed, 16 Mar 2016 15:14:13 +0100
Subject: [PATCH 2/3] tdf#93553 limit parallelism at zip save time to useful
 amount
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

At ODT export time writing and zipping comtained data packages is nicely
parallelized, but not limited to an upper bounds of threads to use.
Together with memory consumption this makes ressource usage and runtime
behaviour bad to crashing (mostly on 32bit).
I have now limited the processing dependent on the number of available
cores to get a good processing/ressource ratio. The result uses much less
memory, is faster and runs on 32bit systems.

Reviewed-on: https://gerrit.libreoffice.org/23305
Tested-by: Jenkins <ci@libreoffice.org>
Reviewed-by: Noel Grandin <noelgrandin@gmail.com>
(cherry picked from commit 7e2ea27e5d56f5cf767a6718a0f5edc28e24af14)

Missing include
(cherry picked from commit 3cdc8c27672e7e9253c9ca9abfc611a0f789da9c)

Reviewed-on: https://gerrit.libreoffice.org/24807
Tested-by: Jenkins <ci@libreoffice.org>
Reviewed-by: Caolán McNamara <caolanm@redhat.com>
Tested-by: Caolán McNamara <caolanm@redhat.com>
(cherry picked from commit 97f46313d304e63e8efc9761ac1e490b186683c1)

Change-Id: I8bd516a9a0cefd644f5d7001214bc717f29770ab
---
 package/inc/ZipOutputEntry.hxx                 |  10 ++-
 package/inc/ZipOutputStream.hxx                |  10 +++
 package/source/zipapi/ZipOutputEntry.cxx       |   6 +-
 package/source/zipapi/ZipOutputStream.cxx      | 102 ++++++++++++++++++-------
 package/source/zippackage/ZipPackageStream.cxx |  11 +++
 5 files changed, 110 insertions(+), 29 deletions(-)

diff --git a/package/inc/ZipOutputEntry.hxx b/package/inc/ZipOutputEntry.hxx
index 3a6447c..f452619 100644
--- a/package/inc/ZipOutputEntry.hxx
+++ b/package/inc/ZipOutputEntry.hxx
@@ -28,6 +28,7 @@
 
 #include <package/Deflater.hxx>
 #include <CRC32.hxx>
+#include <atomic>
 
 struct ZipEntry;
 class ZipPackageBuffer;
@@ -35,6 +36,9 @@ class ZipPackageStream;
 
 class ZipOutputEntry
 {
+    // allow only DeflateThread to change m_bFinished using setFinished()
+    friend class DeflateThread;
+
     ::com::sun::star::uno::Sequence< sal_Int8 > m_aDeflateBuffer;
     ZipUtils::Deflater m_aDeflater;
     css::uno::Reference< css::uno::XComponentContext > m_xContext;
@@ -48,8 +52,9 @@ class ZipOutputEntry
     CRC32               m_aCRC;
     ZipEntry            *m_pCurrentEntry;
     sal_Int16           m_nDigested;
-    bool                m_bEncryptCurrentEntry;
     ZipPackageStream*   m_pCurrentStream;
+    bool                m_bEncryptCurrentEntry;
+    std::atomic<bool>   m_bFinished;
 
 public:
     ZipOutputEntry(
@@ -78,7 +83,10 @@ public:
     void closeEntry();
     void write(const css::uno::Sequence< sal_Int8 >& rBuffer);
 
+    bool isFinished() const { return m_bFinished; }
+
 private:
+    void setFinished() { m_bFinished = true; }
     void doDeflate();
 };
 
diff --git a/package/inc/ZipOutputStream.hxx b/package/inc/ZipOutputStream.hxx
index d6d7853..88e19bb 100644
--- a/package/inc/ZipOutputStream.hxx
+++ b/package/inc/ZipOutputStream.hxx
@@ -69,6 +69,16 @@ private:
         throw(::com::sun::star::io::IOException, ::com::sun::star::uno::RuntimeException);
     void writeEXT( const ZipEntry &rEntry )
         throw(::com::sun::star::io::IOException, ::com::sun::star::uno::RuntimeException);
+
+    // ScheduledThread handling helpers
+    void consumeScheduledThreadEntry(ZipOutputEntry* pCandidate);
+    void consumeFinishedScheduledThreadEntries();
+    void consumeAllScheduledThreadEntries();
+
+public:
+    void reduceScheduledThreadsToGivenNumberOrLess(
+        sal_Int32 nThreads,
+        sal_Int32 nWaitTimeInTenthSeconds);
 };
 
 #endif
diff --git a/package/source/zipapi/ZipOutputEntry.cxx b/package/source/zipapi/ZipOutputEntry.cxx
index 47ecbb4..6f983d3 100644
--- a/package/source/zipapi/ZipOutputEntry.cxx
+++ b/package/source/zipapi/ZipOutputEntry.cxx
@@ -55,8 +55,9 @@ ZipOutputEntry::ZipOutputEntry(
 , m_xOutStream(rxOutput)
 , m_pCurrentEntry(&rEntry)
 , m_nDigested(0)
-, m_bEncryptCurrentEntry(bEncrypt)
 , m_pCurrentStream(pStream)
+, m_bEncryptCurrentEntry(bEncrypt)
+, m_bFinished(false)
 {
     assert(m_pCurrentEntry->nMethod == DEFLATED && "Use ZipPackageStream::rawWrite() for STORED entries");
     assert(m_xOutStream.is());
@@ -77,8 +78,9 @@ ZipOutputEntry::ZipOutputEntry(
 , m_xContext(rxContext)
 , m_pCurrentEntry(&rEntry)
 , m_nDigested(0)
-, m_bEncryptCurrentEntry(bEncrypt)
 , m_pCurrentStream(pStream)
+, m_bEncryptCurrentEntry(bEncrypt)
+, m_bFinished(false)
 {
     assert(m_pCurrentEntry->nMethod == DEFLATED && "Use ZipPackageStream::rawWrite() for STORED entries");
     if (m_bEncryptCurrentEntry)
diff --git a/package/source/zipapi/ZipOutputStream.cxx b/package/source/zipapi/ZipOutputStream.cxx
index be261cd..d77e13b 100644
--- a/package/source/zipapi/ZipOutputStream.cxx
+++ b/package/source/zipapi/ZipOutputStream.cxx
@@ -93,42 +93,92 @@ void ZipOutputStream::rawCloseEntry( bool bEncrypt )
     m_pCurrentEntry = NULL;
 }
 
-void ZipOutputStream::finish()
-    throw(IOException, RuntimeException)
+void ZipOutputStream::consumeScheduledThreadEntry(ZipOutputEntry* pCandidate)
 {
-    assert(!m_aZipList.empty() && "Zip file must have at least one entry!");
+    //Any exceptions thrown in the threads were caught and stored for now
+    ::css::uno::Any aCaughtException(pCandidate->getParallelDeflateException());
+    if (aCaughtException.hasValue())
+        ::cppu::throwException(aCaughtException);
 
-    // Wait for all threads to finish & write
-    m_rSharedThreadPool.waitUntilEmpty();
-    for (size_t i = 0; i < m_aEntries.size(); i++)
+    writeLOC(pCandidate->getZipEntry(), pCandidate->isEncrypt());
+
+    sal_Int32 nRead;
+    uno::Sequence< sal_Int8 > aSequence(n_ConstBufferSize);
+    uno::Reference< io::XInputStream > xInput = pCandidate->getData();
+    do
     {
-        //Any exceptions thrown in the threads were caught and stored for now
-        ::css::uno::Any aCaughtException(m_aEntries[i]->getParallelDeflateException());
-        if (aCaughtException.hasValue())
-            ::cppu::throwException(aCaughtException);
+        nRead = xInput->readBytes(aSequence, n_ConstBufferSize);
+        if (nRead < n_ConstBufferSize)
+            aSequence.realloc(nRead);
 
-        writeLOC(m_aEntries[i]->getZipEntry(), m_aEntries[i]->isEncrypt());
+        rawWrite(aSequence);
+    }
+    while (nRead == n_ConstBufferSize);
+    xInput.clear();
 
-        sal_Int32 nRead;
-        uno::Sequence< sal_Int8 > aSequence(n_ConstBufferSize);
-        uno::Reference< io::XInputStream > xInput = m_aEntries[i]->getData();
-        do
-        {
-            nRead = xInput->readBytes(aSequence, n_ConstBufferSize);
-            if (nRead < n_ConstBufferSize)
-                aSequence.realloc(nRead);
+    rawCloseEntry(pCandidate->isEncrypt());
+
+    pCandidate->getZipPackageStream()->successfullyWritten(pCandidate->getZipEntry());
+    pCandidate->deleteBufferFile();
+    delete pCandidate;
+}
 
-            rawWrite(aSequence);
+void ZipOutputStream::consumeFinishedScheduledThreadEntries()
+{
+    std::vector< ZipOutputEntry* > aNonFinishedEntries;
+
+    for(auto aIter = m_aEntries.begin(); aIter != m_aEntries.end(); ++aIter)
+    {
+        if((*aIter)->isFinished())
+        {
+            consumeScheduledThreadEntry(*aIter);
         }
-        while (nRead == n_ConstBufferSize);
-        xInput.clear();
+        else
+        {
+            aNonFinishedEntries.push_back(*aIter);
+        }
+    }
+
+    // always reset to non-consumed entries
+    m_aEntries = aNonFinishedEntries;
+}
 
-        rawCloseEntry(m_aEntries[i]->isEncrypt());
+void ZipOutputStream::consumeAllScheduledThreadEntries()
+{
+    while(!m_aEntries.empty())
+    {
+        ZipOutputEntry* pCandidate = m_aEntries.back();
+        m_aEntries.pop_back();
+        consumeScheduledThreadEntry(pCandidate);
+    }
+}
+
+void ZipOutputStream::reduceScheduledThreadsToGivenNumberOrLess(sal_Int32 nThreads, sal_Int32 nWaitTimeInTenthSeconds)
+{
+    while(static_cast< sal_Int32 >(m_aEntries.size()) > nThreads)
+    {
+        consumeFinishedScheduledThreadEntries();
 
-        m_aEntries[i]->getZipPackageStream()->successfullyWritten(m_aEntries[i]->getZipEntry());
-        m_aEntries[i]->deleteBufferFile();
-        delete m_aEntries[i];
+        if(static_cast< sal_Int32 >(m_aEntries.size()) > nThreads)
+        {
+            TimeValue aTimeValue;
+            aTimeValue.Seconds = 0;
+            aTimeValue.Nanosec = 100000 * nWaitTimeInTenthSeconds;
+            osl_waitThread(&aTimeValue);
+        }
     }
+}
+
+void ZipOutputStream::finish()
+    throw(IOException, RuntimeException)
+{
+    assert(!m_aZipList.empty() && "Zip file must have at least one entry!");
+
+    // Wait for all threads to finish & write
+    m_rSharedThreadPool.waitUntilEmpty();
+
+    // consume all processed entries
+    consumeAllScheduledThreadEntries();
 
     sal_Int32 nOffset= static_cast < sal_Int32 > (m_aChucker.GetPosition());
     for (size_t i = 0; i < m_aZipList.size(); i++)
diff --git a/package/source/zippackage/ZipPackageStream.cxx b/package/source/zippackage/ZipPackageStream.cxx
index 2ed7f57..9c3f991 100644
--- a/package/source/zippackage/ZipPackageStream.cxx
+++ b/package/source/zippackage/ZipPackageStream.cxx
@@ -55,6 +55,9 @@
 
 #include <PackageConstants.hxx>
 
+#include <algorithm>
+#include <thread>
+
 using namespace com::sun::star::packages::zip::ZipConstants;
 using namespace com::sun::star::packages::zip;
 using namespace com::sun::star::uno;
@@ -478,6 +481,7 @@ private:
             deflateZipEntry(mpEntry, mxInStream);
             mxInStream.clear();
             mpEntry->closeBufferFile();
+            mpEntry->setFinished();
         }
         catch (const uno::Exception&)
         {
@@ -824,6 +828,13 @@ bool ZipPackageStream::saveChild(
 
                 if (bParallelDeflate)
                 {
+                    // tdf#93553 limit to a useful amount of threads. Taking number of available
+                    // cores and allow 4-times the amount for having the queue well filled. The
+                    // 2nd pparameter is the time to wait beweeen cleanups in 10th of a second.
+                    // Both values may be added to the configuration settings if needed.
+                    static sal_Int32 nAllowedThreads(std::max(std::thread::hardware_concurrency(), 1U) * 4);
+                    rZipOut.reduceScheduledThreadsToGivenNumberOrLess(nAllowedThreads, 1);
+
                     // Start a new thread deflating this zip entry
                     ZipOutputEntry *pZipEntry = new ZipOutputEntry(
                             m_xContext, *pTempEntry, this, bToBeEncrypted);
-- 
2.5.5