Blob Blame History Raw
From 29353a92f01280ee2af3cc25ec80032db28a4e98 Mon Sep 17 00:00:00 2001
From: Adi Roiban <adi.roiban@chevah.com>
Date: Tue, 16 Jan 2024 10:52:21 +0100
Subject: [PATCH] Remove the usage of cgi.parse_multipart and replace with
 email module

---
 src/twisted/web/http.py | 87 ++++++++++++++++++++---------------------
 1 file changed, 42 insertions(+), 45 deletions(-)

diff --git a/src/twisted/web/http.py b/src/twisted/web/http.py
index 2bad147..d4b12d8 100644
--- a/src/twisted/web/http.py
+++ b/src/twisted/web/http.py
@@ -100,13 +100,14 @@ __all__ = [
 import base64
 import binascii
 import calendar
-import cgi
 import math
 import os
 import re
 import tempfile
 import time
 import warnings
+from email import message_from_bytes
+from email.message import EmailMessage
 from io import BytesIO
 from typing import AnyStr, Callable, List, Optional, Tuple
 from urllib.parse import (
@@ -224,15 +225,40 @@ weekdayname_lower = [name.lower() for name in weekdayname]
 monthname_lower = [name and name.lower() for name in monthname]
 
 
-def _parseHeader(line):
-    # cgi.parse_header requires a str
-    key, pdict = cgi.parse_header(line.decode("charmap"))
+def _parseContentType(line: bytes) -> bytes:
+    """
+    Parse the Content-Type header.
+    """
+    msg = EmailMessage()
+    msg["content-type"] = line.decode("charmap")
+    key = msg.get_content_type()
+    encodedKey = key.encode("charmap")
+    return encodedKey
+
+
+class _MultiPartParseException(Exception):
+    """
+    Failed to parse the multipart/form-data payload.
+    """
+
 
-    # We want the key as bytes, and cgi.parse_multipart (which consumes
-    # pdict) expects a dict of str keys but bytes values
-    key = key.encode("charmap")
-    pdict = {x: y.encode("charmap") for x, y in pdict.items()}
-    return (key, pdict)
+def _getMultiPartArgs(content, ctype):
+    """
+    Parse the content of a multipart/form-data request.
+    """
+    result = {}
+    multiPartHeaders = b"MIME-Version: 1.0\r\n" + b"Content-Type: " + ctype + b"\r\n"
+    msg = message_from_bytes(multiPartHeaders + content)
+    if not msg.is_multipart():
+        raise _MultiPartParseException("Not a multipart.")
+
+    for part in msg.get_payload():
+        name = part.get_param("name", header="content-disposition")
+        if not name:
+            continue
+        payload = part.get_payload(decode=True)
+        result[name.encode("utf8")] = [payload]
+    return result
 
 
 def urlparse(url):
@@ -973,47 +999,18 @@ class Request:
 
         if self.method == b"POST" and ctype and clength:
             mfd = b"multipart/form-data"
-            key, pdict = _parseHeader(ctype)
-            # This weird CONTENT-LENGTH param is required by
-            # cgi.parse_multipart() in some versions of Python 3.7+, see
-            # bpo-29979. It looks like this will be relaxed and backported, see
-            # https://github.com/python/cpython/pull/8530.
-            pdict["CONTENT-LENGTH"] = clength
+            key = _parseContentType(ctype)
             if key == b"application/x-www-form-urlencoded":
                 args.update(parse_qs(self.content.read(), 1))
             elif key == mfd:
                 try:
-                    cgiArgs = cgi.parse_multipart(
-                        self.content,
-                        pdict,
-                        encoding="utf8",
-                        errors="surrogateescape",
-                    )
-
-                    # The parse_multipart function on Python 3.7+
-                    # decodes the header bytes as iso-8859-1 and
-                    # decodes the body bytes as utf8 with
-                    # surrogateescape -- we want bytes
-                    self.args.update(
-                        {
-                            x.encode("iso-8859-1"): [
-                                z.encode("utf8", "surrogateescape")
-                                if isinstance(z, str)
-                                else z
-                                for z in y
-                            ]
-                            for x, y in cgiArgs.items()
-                            if isinstance(x, str)
-                        }
-                    )
-                except Exception as e:
-                    # It was a bad request, or we got a signal.
+                    self.content.seek(0)
+                    content = self.content.read()
+                    self.args.update(_getMultiPartArgs(content, ctype))
+                except _MultiPartParseException:
+                    # It was a bad request.
                     self.channel._respondToBadRequestAndDisconnect()
-                    if isinstance(e, (TypeError, ValueError, KeyError)):
-                        return
-                    else:
-                        # If it's not a userspace error from CGI, reraise
-                        raise
+                    return
 
             self.content.seek(0, 0)
 
-- 
2.43.0