Receiving¤
Receiver interface and related exceptions.
Receivers¤
Messages are received from channels through
Receiver objects. Receivers
are usually created by calling channel.new_receiver()
and are async
iterators, so the easiest way to receive messages from them as
a stream is to use async for
:
If you need to receive messages in different places or expecting a particular
sequence, you can use the receive()
method:
first_message = await receiver.receive()
print(f"First message: {first_message}")
second_message = await receiver.receive()
print(f"Second message: {second_message}")
Message Transformation¤
If you need to transform the received messages, receivers provide a
map()
method to easily do so:
map()
returns a new full receiver, so you can
use it in any of the ways described above.
Message Filtering¤
If you need to filter the received messages, receivers provide a
filter()
method to easily do so:
async for message in receiver.filter(lambda x: x % 2 == 0):
print(message) # Only even numbers will be printed
As with map()
,
filter()
returns a new full receiver, so you can
use it in any of the ways described above.
Error Handling¤
Tip
For more information about handling errors, please refer to the Error Handling section of the user guide.
If there is an error while receiving a message,
a ReceiverError
exception is raised for both
receive()
method and async iteration
interface.
If the receiver has completely stopped (for example the underlying channel was
closed), a ReceiverStoppedError
exception
is raised by receive()
method.
try:
await receiver.receive()
except ReceiverStoppedError as error:
print("The receiver was stopped")
except ReceiverError as error:
print(f"There was an error trying to receive: {error}")
When used as an async iterator, the iteration will just stop without raising an exception:
try:
async for message in receiver:
print(message)
except ReceiverStoppedError as error:
print("Will never happen")
except ReceiverError as error:
print(f"There was an error trying to receive: {error}")
# If we get here, the receiver was stopped
Advanced Usage¤
Warning
This section is intended for library developers that want to build other low-level abstractions on top of channels. If you are just using channels, you can safely ignore this section.
Receivers extend on the async iterator protocol by providing
a ready()
and
a consume()
method.
The ready()
method is used to await until the
receiver has a new message available, but without actually consuming it. The
consume()
method consumes the next available
message and returns it.
ready()
can be called multiple times, and it
will return immediately if the receiver is already ready.
consume()
must be called only after
ready()
is done and only once, until the next
call to ready()
.
Exceptions are never raised by ready()
, they
are always delayed until consume()
is
called.