⚑ Async helpers¢

python_utils brings itertools-style ergonomics to async for, plus tools for sampling and guarding slow async generators. Because of lazy imports, asyncio is only imported once you actually touch one of these helpers.

acount β€” an async counterΒΆ

The async twin of itertools.count, with an optional step, delay (seconds between yields) and stop value:

import asyncio
from python_utils import aio

async def main():
    async for i in aio.acount(stop=3):
        print(i)          # 0, 1, 2

asyncio.run(main())

abatcher β€” batch by size or timeΒΆ

abatcher wraps an async generator and yields lists of items. Give it a batch_size, an interval, or both β€” it flushes on whichever is reached first. That makes it ideal for chunking bursty producers without ever stalling a slow loop.

Batch purely by size:

import asyncio
from python_utils import aio, generators

async def main():
    async for batch in generators.abatcher(aio.acount(stop=10), batch_size=3):
        print(batch)      # [0, 1, 2], [3, 4, 5], [6, 7, 8], [9]

asyncio.run(main())

Batch by time interval instead (flush at least every interval seconds). interval accepts a datetime.timedelta or a plain number of seconds:

async for batch in generators.abatcher(source(), interval=0.1):
    ...                   # whatever accumulated in the last 100ms

After each yield the interval timer resets from the current time, so a slow, blocking loop never causes a runaway burst.

There’s a synchronous counterpart too:

from python_utils import generators

list(generators.batcher(range(9), 3))   # [[0, 1, 2], [3, 4, 5], [6, 7, 8]]

aio_timeout_generator β€” sample a slow async sourceΒΆ

The async for twin of timeout_generator(): walk an async iterable until a timeout elapses, sleeping interval seconds between items. The interval can grow each round (interval_multiplier) up to an optional maximum_interval, giving you exponential backoff for free. The default iterable is acount, so you get an async counter out of the box.

import asyncio
from python_utils import time

async def main():
    # Yield roughly every 0.06s, for at most 0.1s total.
    async for i in time.aio_timeout_generator(timeout=0.1, interval=0.06):
        print(i)          # 0, 1, ...

asyncio.run(main())

aio_generator_timeout_detector β€” fail fast on a stalled generatorΒΆ

Wrap an async generator so that, if it goes quiet for longer than timeout seconds (or exceeds total_timeout overall), you find out instead of hanging forever. By default the underlying asyncio.TimeoutError is re-raised; pass an on_timeout callback to handle it your own way.

from python_utils import time

async def slow_source(): ...

guarded = time.aio_generator_timeout_detector(slow_source(), timeout=5)

# Or as a decorator:
@time.aio_generator_timeout_detector_decorator(timeout=5)
async def producer(): ...

See the API reference for the full signatures.