1. 程式人生 > >給大家分享一篇 http上傳協議之檔案流實現,輕鬆支援大檔案上傳

給大家分享一篇 http上傳協議之檔案流實現,輕鬆支援大檔案上傳

最近在公司進行業務開發時遇到了一些問題,當需要上傳一個較大的檔案時,經常會遇到記憶體被大量佔用的情況。公司之前使用的web框架是一個老前輩實現的。在實現multipart/form-data型別的post請求解析時, 是將post請求體一次性讀到記憶體中再做解析的,從而導致記憶體佔用過大。而我之前為公司開發的框架

ShichaoMa/star_builder
​github.com
圖示
是基於apistar這個asgi框架的,而apistar在解析mutilpart時使用的時flask作者編寫的

https://github.com/pallets/werkzeug
​github.com
flask和django在對待multipart報文解析使用的方案基本是一致的,通過持續的解析請求體,將解析出來的檔案內容放入一個工廠類建立的類檔案物件中,工廠類在django中返回uploader的子類,在flask中叫作stream_factory。可以使用基於記憶體的,也可以使用基於臨時檔案的。

但是apistar作者在借用werkzeug的FormDataParser解析時,卻直接將一個BytesIO傳入了!而BytesIO中存放的是全量請求體,這勢必會全部存在於記憶體中!那麼帶來的問題就是,當上傳大檔案時,記憶體會被撐爆!程式碼如下:

   class MultiPartCodec(BaseCodec):
    media_type = 'multipart/form-data'

    def decode(self, bytestring, headers, **options):
        try:
            content_length = max(0
, int(headers['content-length'])) except (KeyError, ValueError, TypeError): content_length = None try: mime_type, mime_options = parse_options_header(headers['content-type']) except KeyError: mime_type, mime_options = '', {} body_file = BytesIO(bytestring) parser = FormDataParser() stream, form, files = parser.parse(body_file, mime_type, content_length, mime_options) return
ImmutableMultiDict(chain(form.items(multi=True), files.items(multi=True)))

其實想必這也是不得已的事情,因為apistar支援ASGI協議,這就導致了每次請求IO都是非同步的,非同步read介面和同步介面呼叫方式肯定不一樣,所以作者想偷懶不自己實現一套非同步解析方案,那麼只能麼做。

作者想偷懶我可以理解,但是公司對我的要求讓我感覺鴨梨山大,之前基於s3檔案的上傳服務是由我開發的,使用的框架也是我依賴apistar開發的star_builder,現在公司要求廢棄掉公司之前的檔案上傳服務(也就是基於老前輩web框架開發的那個),將所有介面全部轉移到我開發的服務上來。那麼勢必要求我一併解決掉大檔案上傳的問題。所以沒有辦法,只能為apistar的作者造個輪子接上先用著了。

在我簡單瞭解了multipart/form-data協議之後,實現了一個FileStream類和File類,每個類都返回可非同步迭代物件,FileStream迭代File物件,File物件迭代資料,迭代過程實時解析請求體,實時發現檔案物件,實時處理得到的檔案資料。以這種方式處理上傳的檔案,對記憶體不會產生任何壓力。

FIleStream的實現如下:

class FileStream(object):

    def __init__(self, receive, boundary):
        self.receive = receive
        self.boundary = boundary
        self.body = b""
        self.closed = False

    def __aiter__(self):
        return self

    async def __anext__(self):
        return await File.from_boundary(self, self.receive, self.boundary)

FileStream支援非同步迭代,每次返回一個File物件。同時FIleStream儲存已讀但未返回到應用層的請求體資料。

File的實現如下:

class File(object):
    mime_type_regex = re.compile(b"Content-Type: (.*)")
    disposition_regex = re.compile(
        rb"Content-Disposition: form-data;"
        rb"(?: name=\"(?P<name>[^;]*?)\")?"
        rb"(?:; filename\*?=\"?"
        rb"(?:(?P<enc>.+?)'"
        rb"(?P<lang>\w*)')?"
        rb"(?P<filename>[^\"]*)\"?)?")

    def __init__(self, stream, receive, boundary, name, filename, mimetype):
        self.mimetype = mimetype
        self.receive = receive
        self.filename = filename
        self.name = name
        self.stream = stream
        self.tmpboundary = b"\r\n--" + boundary
        self.boundary_len = len(self.tmpboundary)
        self._last = b""
        self._size = 0
        self.body_iter = self._iter_content()

    def __aiter__(self):
        return self.body_iter

    def __str__(self):
        return f"<{self.__class__.__name__} " \
               f"name={self.name} " \
               f"filename={self.filename} >"

    __repr__ = __str__

    def iter_content(self):
        return self.body_iter

    async def _iter_content(self):
        stream = self.stream
        while True:
            # 如果存在read過程中剩下的,則直接返回
            if self._last:
                yield self._last
                continue

            index = self.stream.body.find(self.tmpboundary)
            if index != -1:
                # 找到分隔線,返回分隔線前的資料
                # 並將分隔及分隔線後的資料返回給stream
                read, stream.body = stream.body[:index], stream.body[index:]
                self._size += len(read)
                yield read
                if self._last:
                    yield self._last
                break
            else:
                if self.stream.closed:
                    raise RuntimeError("Uncomplete content!")
                # 若沒有找到分隔線,為了防止分隔線被讀取了一半
                # 選擇只返回少於分隔線長度的部分body
                read = stream.body[:-self.boundary_len]
                stream.body = stream.body[-self.boundary_len:]
                self._size += len(read)
                yield read
                await self.get_message(self.receive, stream)

    async def read(self, size=10240):
        read = b""
        assert size > 0, (999, "Read size must > 0")
        while len(read) < size:
            try:
                buffer = await self.body_iter.asend(None)
            except StopAsyncIteration:
                return read
            read = read + buffer
            read, self._last = read[:size], read[size:]
        return read

    @staticmethod
    async def get_message(receive, stream):
        message = await receive()

        if not message['type'] == 'http.request':
            raise RuntimeError(
                f"Unexpected ASGI message type: {message['type']}.")

        if not message.get('more_body', False):
            stream.closed = True
        stream.body += message.get("body", b"")

    def tell(self):
        return self._size

    @classmethod
    async def from_boundary(cls, stream, receive, boundary):
        tmp_boundary = b"--" + boundary
        while not stream.closed:
            await cls.get_message(receive, stream)

            if b"\r\n\r\n" in stream.body and tmp_boundary in stream.body or \
                    stream.closed:
                break

        return cls(stream, receive, boundary,
                   *cls.parse_headers(stream, tmp_boundary))

    @classmethod
    def parse_headers(cls, stream, tmp_boundary):
        end_boundary = tmp_boundary + b"--"
        body = stream.body
        index = body.find(tmp_boundary)
        if index == body.find(end_boundary):
            raise StopAsyncIteration
        body = body[index + len(tmp_boundary):]
        header_str = body[:body.find(b"\r\n\r\n")]
        body = body[body.find(b"\r\n\r\n") + 4:]
        groups = cls.disposition_regex.search(header_str).groupdict()
        filename = groups["filename"] and unquote(groups["filename"].decode())
        if groups["enc"]:
            filename = filename.encode().decode(groups["enc"].decode())
        name = groups["name"].decode()

        mth = cls.mime_type_regex.search(header_str)
        mimetype = mth and mth.group(1).decode()
        stream.body = body
        assert name, "FileStream iterated without File consumed. "
        return name, filename, mimetype

File例項也是一個非同步可迭代物件,每次迭代從receive中實時獲取資料

ASGI - Channels 2.1.2 documentation
​channels.readthedocs.io

同時File還支援非同步read,但read本質上也是對File物件的迭代。

那麼正確的使用姿勢是怎樣的呢?

下面是star_builder構建的專案中關於FileStream在一次請求中action的demo實現。

@post("/test_upload")
    async def up(stream: FileStream):
        async for file in stream:
            if file.filename:
                with open(file.filename, "wb") as f:
                    async for chuck in file:
                        f.write(chuck)
            else:
                # 沒有filename的是其它型別的form引數
                arg = await file.read()
                print(f"Form引數:{file.name}={arg.decode()}")

使用方法非常簡單,不會生成臨時檔案,也不會佔用記憶體來儲存。實時非同步從socket中讀取資料,非要說有什麼缺點的話,就是不全部迭代完的話,是無法知道這一次請求中一共上傳了幾個檔案的。如果需要提前知道的話,可以通過前端配合通過url傳入params引數來獲取檔案相關屬性資訊。

這種實時從socket讀取的實現方案,應該是基於http協議效能最好的檔案上傳方案。歡迎評論區發表意見和建議。

 
Python學習交流群:834179111,群裡有很多的學習資料。歡迎歡迎各位前來交流學習。