async_stagger API reference

The Main Package

await async_stagger.create_connected_sock(host, port, *, family=AddressFamily.AF_UNSPEC, proto=0, flags=0, local_addrs=None, delay=0.25, resolver=<function concurrent_resolver>, raise_exc_group=False)

Connect to (host, port) and return a connected socket.

This function implements RFC 6555 Happy Eyeballs and some features of RFC 8305 Happy Eyeballs v2. When a host name resolves to multiple IP addresses, connection attempts are made in parallel with staggered start times, and the one completing fastest is used. The resolved addresses can be interleaved by address family, so even if network connectivity for one address family is broken (when IPv6 fails, for example), connections still complete quickly. IPv6 and IPv4 addresses of a hostname can also be resolved in parallel.

(Some fancier features specified in RFC 8305, like statefulness and features related to NAT64 and DNS64 are not implemented. Destination address sorting is left for the operating system; it is assumed that the addresses returned by getaddrinfo() is already sorted according to OS’s preferences.)

Most of the arguments should be familiar from the various socket and asyncio methods. delay, interleave, async_dns and resolution_delay control Happy Eyeballs-specific behavior. local_addrs is a new argument providing new features not specific to Happy Eyeballs.

Parameters:
  • host (str | bytes | None) – Host name to connect to. Unlike asyncio.open_connection() there is no default, but it’s still possible to manually specify None here.

  • port (str | bytes | int | None) – Port number to connect to. Similar to host, None can be specified here as well.

  • family (int) – Address family. Specify socket.AF_INET or socket.AF_INET6 here to limit the type of addresses used. See documentation on the socket module for details.

  • proto (int) – Socket protocol. Since the socket type is always socket.SOCK_STREAM, proto can usually be left unspecified.

  • flags (int) – Flags passed to getaddrinfo. See documentation on socket.getaddrinfo() for details. similar to host and port.

  • local_addrs (Union[Iterable[tuple], AsyncIterable[tuple]]) – A sync or async iterable of address tuples, all of which are candidates for locally binding the socket to. This allows e.g. providing one IPv4 and one IPv6 address. Addresses must be already resolved. In particular, IPv4 addresses must be 2-tuples of (host, port), and IPv6 addresses must be 4-tuples of (host, port, flowinfo, scope_id), where host is the literal representation of the address.

  • delay (Optional[float]) – Amount of time to wait before making connections to different addresses. This is the “Connect Attempt Delay” as defined in RFC 8305.

  • resolver (ResolverType) –

    The resolver to use. To customize behavior of the resolver, use functools.partial() to bind arguments to the resolver before passing it in. for example, to set “First Address Family Count” to 2:

    resolver = functools.partial(async_stagger.resolvers.concurrent_resolver, first_addr_family_count=2)
    socket = await async_stagger.create_connected_sock(..., resolver=resolver)
    

  • raise_exc_group (bool) – Determines what exception to raise when all connection attempts fail. If set to True, raise an instance of ExceptionGroup containing all the individual exceptions raised by each connection and address resolution attempt. When set to false (default), an exception is raised the same way as asyncio.open_connection(): if all the connection attempt exceptions have the same str, one of them is raised, otherwise an instance of OSError is raised whose message contains str representations of all connection attempt exceptions.

Return type:

socket

Returns:

The connected socket.socket object.

Added in version v0.1.3: the local_addrs parameter.

Added in version v0.2.1: the async_dns and resolution_delay parameters.

Changed in version v0.4.0: local_addrs parameter now takes sync or async iterables of already-resolved addresses. Support for host names is removed.

Changed in version v0.4.0: the detailed_exceptions parameter is replaced by raise_exc_group. When specified, an ExceptionGroup is raised.

Removed in version v0.4.0: the loop parameter.

Removed in version v0.4.0: the local_addr parameter. Use local_addrs instead.

await async_stagger.create_connection(protocol_factory, host, port, **kwargs)

Connect to (host, port) and return (transport, protocol).

This is a replacement for asyncio.loop.create_connection() with added Happy Eyeballs. Refer to that function’s documentation for explanations of these arguments: protocol_factory, ssl, and server_hostname. Refer to create_connected_sock() for all other arguments.

Return type:

tuple[Transport, Protocol]

Returns:

(transport, protocol), the same as asyncio.loop.create_connection().

await async_stagger.open_connection(host, port, **kwargs)

Connect to (host, port) and return (reader, writer).

This is a replacement for asyncio.open_connection() with added Happy Eyeballs. Refer to the documentation of that function for what limit does, and refer to create_connection() and create_connected_sock() for everything else.

Return type:

tuple[StreamReader, StreamWriter]

Returns:

(reader, writer), the same as asyncio.open_connection().

await async_stagger.staggered_race(coro_fns, delay)

Run coroutines with staggered start times and take the first to finish.

This function takes an async iterable of coroutine functions. The first one is retrieved and started immediately. From then on, whenever the immediately preceding one fails (raises an exception), or when delay seconds has passed, the next coroutine is retrieved and started. This continues until one of the coroutines complete successfully, in which case all others are cancelled, or until all coroutines fail.

The coroutines provided should be well-behaved in the following way:

  • They should only return if completed successfully.

  • They should always raise an exception if they did not complete successfully. In particular, if they handle cancellation, they should probably reraise, like this:

    try:
        # do work
    except asyncio.CancelledError:
        # undo partially completed work
        raise
    
Parameters:
  • coro_fns (AsyncIterable[Callable[[], Awaitable]]) – an async iterable of coroutine functions, i.e. callables that return a coroutine object when called. Use functools.partial() or lambdas to pass arguments. If you want to use a regular iterable here, wrap it with aiter_from_iter().

  • delay (Optional[float]) – amount of time, in seconds, between starting coroutines. If None, the coroutines will run sequentially.

Return type:

tuple[Any, Optional[int], list[Optional[Exception]], Optional[Exception]]

Returns:

tuple (winner_result, winner_index, coro_exc, aiter_exc) where

  • winner_result: the result of the winning coroutine, or None if no coroutines won.

  • winner_index: the index of the winning coroutine in coro_fns, or None if no coroutines won. If the winning coroutine may return None on success, winner_index can be used to definitively determine whether any coroutine won.

  • coro_exc: list of exceptions raised by the coroutines. len(exceptions) is equal to the number of coroutines actually started, and the order is the same as in coro_fns. The winning coroutine’s entry is None.

  • aiter_exc: exception raised by the coro_fns async iterable, or None if coro_fns was iterated to completion without raising any exception.

Changed in version v0.2.0: coro_fns argument now takes an async iterable instead of a regular iterable.

Changed in version v0.3.0: The return value is now a 4-tuple. aiter_exc is added.

Changed in version v0.4.0: Removed loop parameter.

resolvers

Code related to resolving host names to IP addresses.

A resolver is a callable with signature resolver(host, port, *, family=0, type=0, proto=0, flags=0) (and more optional arguments if necessary) that returns an async iterable of 5-tuples (family, type, proto, canonname, sockaddr). This is almost the same signature as socket.getaddrinfo(), except returning an async iterator instead of a list.

This module provides two resolvers. basic_resolver() resolves IPv4 and IPv6 addresses together, and concurrent_resolver() (the default used by create_connected_sock()) resolves IPv4 and IPv6 addresses separately in parallel.

Extending resolvers

Both resolvers provided here can use alternate implementations of getaddrinfo. For example, say you have implemented an async function getaddrinfo_dns_over_https(server, host, port, *, family=0, type=0, proto=0, flags=0) that resolves host names over DoH. You can then do:

from functools import partial
getaddrinfo = partial(getaddrinfo_dns_over_https, server='https://cloudflare-dns.com/dns-query')
doh_resolver = partial(async_stagger.resolvers.concurrent_resolver, getaddrinfo_async=getaddrinfo)
reader, writer = await async_stagger.open_connection(host, port, resolver=doh_resolver)

to use Cloudflare’s DoH server when making connections.

And of course, you can implement entirely new resolvers.

async for ... in async_stagger.resolvers.basic_resolver(host, port, *, family=AddressFamily.AF_UNSPEC, type=0, proto=0, flags=0, first_addr_family_count=1, getaddrinfo_async=<function _getaddrinfo>)

The basic resolver.

Resolves all IP addresses in one call to getaddrinfo_async. The returned addresses are then interleaved by family.

For arguments host, port, family, type, proto, flags, refer to socket.getaddrinfo().

Parameters:
  • first_addr_family_count (int) – “First Address Family Count” defined in RFC 8305. i.e. the reordered list will have this many addresses for the first address family, and the rest will be interleaved one to one.

  • getaddrinfo_async (AsyncGetAddrInfoType) – the async getaddrinfo implementation that’s used to actually resolve the host.

Return type:

AsyncIterator[tuple[int, int, int, str, tuple]]

async_stagger.resolvers.concurrent_resolver(host, port, *, family=AddressFamily.AF_UNSPEC, type=0, proto=0, flags=0, resolution_delay=0.05, first_addr_family_count=1, raise_exc_group=False, getaddrinfo_async=<function _getaddrinfo>)

The concurrent resolver.

When family == socket.AF_UNSPEC, two calls to getaddrinfo_async are run in parallel, one for IPv6 and one for IPv4. Addresses are yielded once either run returns with addresses.

When family != socket.AF_UNSPEC, the call is dispatched to basic_resolver().

For arguments host, port, family, type, proto, flags, refer to socket.getaddrinfo().

Parameters:
  • resolution_delay (float) – Amount of time to wait for IPv6 addresses to resolve if IPv4 addresses are resolved first. This is the “Resolution Delay” as defined in RFC 8305.

  • first_addr_family_count (int) – “First Address Family Count” defined in RFC 8305. i.e. the reordered list will have this many addresses for the first address family, and the rest will be interleaved one to one.

  • raise_exc_group (bool) – If set to True, when both IPv6 and IPv4 resolutions fail, raise a ExceptionGroup containing both exceptions. If set to False, raise a socket.gaierror whose message contains str representations of both exceptions.

  • getaddrinfo_async (AsyncGetAddrInfoType) – the async getaddrinfo implementation that’s used to actually resolve the host.

Return type:

AsyncIterator[tuple[int, int, int, str, tuple]]

aitertools

Tools for working with async iterators.

async for ... in async_stagger.aitertools.aiter_from_iter(iterable)

Wrap an async iterator around a regular iterator.

Parameters:

iterable (Iterable[TypeVar(T)]) – a regular iterable.

Return type:

AsyncIterator[TypeVar(T)]

Returns:

An async iterator yielding the same items as the original iterable.

await async_stagger.aitertools.aiterclose(aiterator)

Close the async iterator if possible.

Async generators have an aclose() method that closes the generator and cleans up associated resources. Plain async iterators do not have anything similar, but PEP 533 suggests adding an __aiterclose__() method, and having it called automatically when exiting from an async with loop.

This function tries to close the async iterator using either method, and if neither is available, does nothing.

Parameters:

aiterator (AsyncIterator) – the async iterator to close.

async for ... in async_stagger.aitertools.product(*aiterables, repeat=1)

Async version of itertools.product().

Compute the cartesian product of input iterables. The arguments are analogous to its itertools counterpart.

The input async iterables are evaluated lazily. As a result the last input iterable is iterated and exhausted first, then the next-to-last is iterated, and so on.

Parameters:
  • aiterables (AsyncIterable) – input async iterables.

  • repeat (int) – used to compute the product of input async iterables with themselves.

Return type:

AsyncIterator

exceptions