Skip to content

Receiving¤

Receiver interface and related exceptions.

Receivers¤

Messages are received from channels through Receiver objects. Receivers are usually created by calling channel.new_receiver() and are async iterators, so the easiest way to receive messages from them as a stream is to use async for:

async for message in receiver:
    print(message)

If you need to receive messages in different places or expecting a particular sequence, you can use the receive() method:

first_message = await receiver.receive()
print(f"First message: {first_message}")

second_message = await receiver.receive()
print(f"Second message: {second_message}")

Message Transformation¤

If you need to transform the received messages, receivers provide a map() method to easily do so:

async for message in receiver.map(lambda x: x + 1):
    print(message)

map() returns a new full receiver, so you can use it in any of the ways described above.

Message Filtering¤

If you need to filter the received messages, receivers provide a filter() method to easily do so:

async for message in receiver.filter(lambda x: x % 2 == 0):
    print(message)  # Only even numbers will be printed

As with map(), filter() returns a new full receiver, so you can use it in any of the ways described above.

Error Handling¤

Tip

For more information about handling errors, please refer to the Error Handling section of the user guide.

If there is an error while receiving a message, a ReceiverError exception is raised for both receive() method and async iteration interface.

If the receiver has completely stopped (for example the underlying channel was closed), a ReceiverStoppedError exception is raised by receive() method.

try:
    await receiver.receive()
except ReceiverStoppedError as error:
    print("The receiver was stopped")
except ReceiverError as error:
    print(f"There was an error trying to receive: {error}")

When used as an async iterator, the iteration will just stop without raising an exception:

try:
    async for message in receiver:
        print(message)
except ReceiverStoppedError as error:
    print("Will never happen")
except ReceiverError as error:
    print(f"There was an error trying to receive: {error}")
# If we get here, the receiver was stopped

Advanced Usage¤

Warning

This section is intended for library developers that want to build other low-level abstractions on top of channels. If you are just using channels, you can safely ignore this section.

Receivers extend on the async iterator protocol by providing a ready() and a consume() method.

The ready() method is used to await until the receiver has a new message available, but without actually consuming it. The consume() method consumes the next available message and returns it.

ready() can be called multiple times, and it will return immediately if the receiver is already ready. consume() must be called only after ready() is done and only once, until the next call to ready().

Exceptions are never raised by ready(), they are always delayed until consume() is called.