Aiohttp: limiting POST upload

I’d like to find a way to limit HTTP POST uploads with aiohttp to a certain mbps value.

The idea is, that I’d be HTTP POSTing byte chunks, and after every chunk (depending on how long the writing took), do some asyncio.sleep, this way achieving a certain mbps.

So, business as usual. I’m doing this:

async with self.upload_session.post(url, data = mpwriter) as resp

First I thought of subclassing aiohttp.multipart.MultipartWriter to use a subclassed aiohttp.multipart.MultipartPayloadWriter, but that didn’t fly at all.

aiohttp.multipart.MultipartWriter.write does the byte-writing, but its using a “writer” object, that comes … somewhere. It seems to be a StreamWriter object

How I could sneak in a custom StreamWriter object into this pipeline? It goes rather deep into the aiohttp infrastructure… Tried to follow the ClientSession class to its post method but can’t figure it out.

Help!

I agree that this task is super complicated.

mpwriter has aiohttp.MultipartWriter type.
In turn, MultipartWriter is a payload instance, the base Payload is defined in payload.py.

You can create a wrapper that accepts MultipartWriter, supports all required Payload properties, and in own async def write(...) method calls await mpwriter.write(...) with await asyncio.sleep(...) when needed.

Thank you.

One possibility would be just to add this kind possiblity to the MultipartWriter in the aiohttp official version.

Would that make any sense…?

Any brief instructions/clarifications on how to create that wrapper you mentioned would be appreciated.

Sorry, I have no time to work on this particular thing; I have more important targets in the aiohttp roadmap.
Thanks for your understanding.

Making a wrapper that conforms to Payload ABC but redirects everything to MultipartWriter is not very hard.

Rather than sleep to try to control the flow, you may want to look at the leaky bucket algorithm to control the rate of data.

I’ve written up an asycio implementation that I really should turn into a PyPI project you could use for this.

I don’t think this is something that aiohttp should provide however; rate limiting is quite a specialized and domain-specific task.

1 Like

I think leaky-bucket is ok at the server side… if it can’t handle incoming data, then something is dropped.

I was talking about client side: sending the data to the server slooowly (and not dropping / “leaking” anything)

Thanks for the link. I’ll take look at it at some moment.

You misunderstand what the algorithm does. It doesn’t leak data, it drips out access to a limited resource based on how much time has passed.

At least, that’s what my implementation does, there is no overflow where requests for access are denied outright; that’d require code that uses it to check on .has_capacity() and choose not to wait.

Here’s a Payload subclass that uses a leaky bucket to limit the rate at which it writes. It takes an async iterable of chunks:

import aiohttp, aiohttp.abc
from typing import Any, Optional

class RateLimitedAsyncIterablePayload(aiohttp.AsyncIterablePayload):
    """Rate-limited payload

    Writes data from an iterator at at most *max_rate* bytes / *time* seconds
    """
    _bucket: AsyncLeakyBucket
    max_rate: int
    time: int

    def __init__(self,
                 *args: Any,
                 max_rate: int = 1024,
                 time: int = 60,
                 **kwargs: Any) -> None:
        if max_rate <= 0:
            raise ValueError(f"Rate can't be negative or zero, got {max_rate}")
        if time <= 0:
            raise ValueError(f"Time can't be negative or zero, got {time}")

        super().__init__(value, *args, **kwargs)

        self._bucket = AsyncLeakyBucket(rate, time)
        self.max_rate, self.time = max_rate, time

    async def write(self, writer: aiohttp.abc.AbstractStreamWriter) -> None:
        if self._iter:
            try:
                max_rate = self.max_rate
                while True:
                    chunk = await self._iter.__anext__()
                    size = len(chunk)
                    while size:
                        # handle chunks larger than the max_rate by breaking them up further.
                        to_write = chunk
                        if size > max_rate:
                            to_write, chunk = chunk[:max_rate], chunk[max_rate:]
                        size -= len(to_write)
                        await self._bucket.acquire(len(to_write))  # blocks until capacity is available
                        await writer.write(to_write)
            except StopAsyncIteration:
                self._iter = None

and then pass in RateLimitedAsyncIterablePayload(chunk_iterable, 2048, 60) as the data argument, and the data will be written to the socket at a rate of at most 2KB / minute.