Skip to content

event

frequenz.channels.event ¤

A receiver that can be made ready directly.

Tip

Read the Event documentation for more information.

This module contains the following:

  • Event: A receiver that can be made ready directly.

Classes¤

frequenz.channels.event.Event ¤

Bases: Receiver[None]

A receiver that can be made ready directly.

Usage¤

There are cases where it is useful to be able to send a signal to a select() loop, for example, to stop a loop from outside the loop itself.

To do that, you can use an Event receiver and call set() on it when you want to make it ready.

Stopping¤

The receiver will be re-activated (will keep blocking) after the current set event is received. To stop the receiver completely, you can call stop().

Example¤
Exit after printing the first 5 numbers
import asyncio

from frequenz.channels import Anycast, select, selected_from
from frequenz.channels.event import Event

channel: Anycast[int] = Anycast(name="channel")
receiver = channel.new_receiver()
sender = channel.new_sender()
stop_event = Event(name="stop")


async def do_work() -> None:
    async for selected in select(receiver, stop_event):
        if selected_from(selected, receiver):
            print(selected.message)
        elif selected_from(selected, stop_event):
            print("Stop event triggered")
            stop_event.stop()
            break


async def send_stuff() -> None:
    for i in range(10):
        if stop_event.is_stopped:
            break
        await asyncio.sleep(1)
        await sender.send(i)


async def main() -> None:
    async with asyncio.TaskGroup() as task_group:
        task_group.create_task(do_work(), name="do_work")
        task_group.create_task(send_stuff(), name="send_stuff")
        await asyncio.sleep(5.5)
        stop_event.set()


asyncio.run(main())
Source code in frequenz/channels/event.py
class Event(Receiver[None]):
    """A receiver that can be made ready directly.

    # Usage

    There are cases where it is useful to be able to send a signal to
    a [`select()`][frequenz.channels.select] loop, for example, to stop a loop from
    outside the loop itself.

    To do that, you can use an [`Event`][frequenz.channels.event.Event] receiver and
    call [`set()`][frequenz.channels.event.Event.set] on it when you want to make it
    ready.

    # Stopping

    The receiver will be re-activated (will keep blocking) after the current set
    event is received. To stop the receiver completely, you can call
    [`stop()`][frequenz.channels.event.Event.stop].

    # Example

    Example: Exit after printing the first 5 numbers
        ```python
        import asyncio

        from frequenz.channels import Anycast, select, selected_from
        from frequenz.channels.event import Event

        channel: Anycast[int] = Anycast(name="channel")
        receiver = channel.new_receiver()
        sender = channel.new_sender()
        stop_event = Event(name="stop")


        async def do_work() -> None:
            async for selected in select(receiver, stop_event):
                if selected_from(selected, receiver):
                    print(selected.message)
                elif selected_from(selected, stop_event):
                    print("Stop event triggered")
                    stop_event.stop()
                    break


        async def send_stuff() -> None:
            for i in range(10):
                if stop_event.is_stopped:
                    break
                await asyncio.sleep(1)
                await sender.send(i)


        async def main() -> None:
            async with asyncio.TaskGroup() as task_group:
                task_group.create_task(do_work(), name="do_work")
                task_group.create_task(send_stuff(), name="send_stuff")
                await asyncio.sleep(5.5)
                stop_event.set()


        asyncio.run(main())
        ```
    """

    def __init__(self, *, name: str | None = None) -> None:
        """Initialize this event.

        Args:
            name: The name of the receiver.  If `None` an `id(self)`-based name will be
                used. This is only for debugging purposes, it will be shown in the
                string representation of the receiver.
        """
        self._event: _asyncio.Event = _asyncio.Event()
        """The event that is set when the receiver is ready."""

        self._name: str = f"{id(self):_}" if name is None else name
        """The name of the receiver.

        This is for debugging purposes, it will be shown in the string representation
        of the receiver.
        """

        self._is_set: bool = False
        """Whether the receiver is ready to be consumed.

        This is used to differentiate between when the receiver was stopped (the event
        is triggered too) but still there is an event to be consumed and when it was
        stopped but was not explicitly set().
        """

        self._is_stopped: bool = False
        """Whether the receiver is stopped."""

    @property
    def name(self) -> str:
        """The name of this receiver.

        This is for debugging purposes, it will be shown in the string representation
        of this receiver.
        """
        return self._name

    @property
    def is_set(self) -> bool:
        """Whether this receiver is set (ready)."""
        return self._is_set

    @property
    def is_stopped(self) -> bool:
        """Whether this receiver is stopped."""
        return self._is_stopped

    def stop(self) -> None:
        """Stop this receiver."""
        self._is_stopped = True
        self._event.set()

    def set(self) -> None:
        """Trigger the event (make the receiver ready)."""
        self._is_set = True
        self._event.set()

    async def ready(self) -> bool:
        """Wait until this receiver is ready.

        Returns:
            Whether this receiver is still running.
        """
        if self._is_stopped:
            return False
        await self._event.wait()
        return not self._is_stopped

    def consume(self) -> None:
        """Consume the event.

        This makes this receiver wait again until the event is set again.

        Raises:
            ReceiverStoppedError: If this receiver is stopped.
        """
        if not self._is_set and self._is_stopped:
            raise ReceiverStoppedError(self)

        assert self._is_set, "calls to `consume()` must be follow a call to `ready()`"

        self._is_set = False
        self._event.clear()

    def __str__(self) -> str:
        """Return a string representation of this event."""
        return f"{type(self).__name__}({self._name!r})"

    def __repr__(self) -> str:
        """Return a string representation of this event."""
        return (
            f"<{type(self).__name__} name={self._name!r} is_set={self.is_set!r} "
            f"is_stopped={self.is_stopped!r}>"
        )
Attributes¤
is_set property ¤
is_set: bool

Whether this receiver is set (ready).

is_stopped property ¤
is_stopped: bool

Whether this receiver is stopped.

name property ¤
name: str

The name of this receiver.

This is for debugging purposes, it will be shown in the string representation of this receiver.

Functions¤
__aiter__ ¤
__aiter__() -> Self

Get an async iterator over the received messages.

RETURNS DESCRIPTION
Self

This receiver, as it is already an async iterator.

Source code in frequenz/channels/_receiver.py
def __aiter__(self) -> Self:
    """Get an async iterator over the received messages.

    Returns:
        This receiver, as it is already an async iterator.
    """
    return self
__anext__ async ¤
__anext__() -> ReceiverMessageT_co

Await the next message in the async iteration over received messages.

RETURNS DESCRIPTION
ReceiverMessageT_co

The next received message.

RAISES DESCRIPTION
StopAsyncIteration

If the receiver stopped producing messages.

ReceiverError

If there is some problem with the receiver.

Source code in frequenz/channels/_receiver.py
async def __anext__(self) -> ReceiverMessageT_co:  # noqa: DOC503
    """Await the next message in the async iteration over received messages.

    Returns:
        The next received message.

    Raises:
        StopAsyncIteration: If the receiver stopped producing messages.
        ReceiverError: If there is some problem with the receiver.
    """
    try:
        await self.ready()
        return self.consume()
    except ReceiverStoppedError as exc:
        raise StopAsyncIteration() from exc
__init__ ¤
__init__(*, name: str | None = None) -> None

Initialize this event.

PARAMETER DESCRIPTION
name

The name of the receiver. If None an id(self)-based name will be used. This is only for debugging purposes, it will be shown in the string representation of the receiver.

TYPE: str | None DEFAULT: None

Source code in frequenz/channels/event.py
def __init__(self, *, name: str | None = None) -> None:
    """Initialize this event.

    Args:
        name: The name of the receiver.  If `None` an `id(self)`-based name will be
            used. This is only for debugging purposes, it will be shown in the
            string representation of the receiver.
    """
    self._event: _asyncio.Event = _asyncio.Event()
    """The event that is set when the receiver is ready."""

    self._name: str = f"{id(self):_}" if name is None else name
    """The name of the receiver.

    This is for debugging purposes, it will be shown in the string representation
    of the receiver.
    """

    self._is_set: bool = False
    """Whether the receiver is ready to be consumed.

    This is used to differentiate between when the receiver was stopped (the event
    is triggered too) but still there is an event to be consumed and when it was
    stopped but was not explicitly set().
    """

    self._is_stopped: bool = False
    """Whether the receiver is stopped."""
__repr__ ¤
__repr__() -> str

Return a string representation of this event.

Source code in frequenz/channels/event.py
def __repr__(self) -> str:
    """Return a string representation of this event."""
    return (
        f"<{type(self).__name__} name={self._name!r} is_set={self.is_set!r} "
        f"is_stopped={self.is_stopped!r}>"
    )
__str__ ¤
__str__() -> str

Return a string representation of this event.

Source code in frequenz/channels/event.py
def __str__(self) -> str:
    """Return a string representation of this event."""
    return f"{type(self).__name__}({self._name!r})"
consume ¤
consume() -> None

Consume the event.

This makes this receiver wait again until the event is set again.

RAISES DESCRIPTION
ReceiverStoppedError

If this receiver is stopped.

Source code in frequenz/channels/event.py
def consume(self) -> None:
    """Consume the event.

    This makes this receiver wait again until the event is set again.

    Raises:
        ReceiverStoppedError: If this receiver is stopped.
    """
    if not self._is_set and self._is_stopped:
        raise ReceiverStoppedError(self)

    assert self._is_set, "calls to `consume()` must be follow a call to `ready()`"

    self._is_set = False
    self._event.clear()
filter ¤
filter(
    filter_function: (
        Callable[[ReceiverMessageT_co], bool]
        | Callable[
            [ReceiverMessageT_co],
            TypeGuard[FilteredMessageT_co],
        ]
    )
) -> (
    Receiver[ReceiverMessageT_co]
    | Receiver[FilteredMessageT_co]
)

Apply a filter function on the messages on a receiver.

Note

You can pass a type guard as the filter function to narrow the type of the messages that pass the filter.

Tip

The returned receiver type won't have all the methods of the original receiver. If you need to access methods of the original receiver that are not part of the Receiver interface you should save a reference to the original receiver and use that instead.

PARAMETER DESCRIPTION
filter_function

The function to be applied on incoming messages to determine if they should be received.

TYPE: Callable[[ReceiverMessageT_co], bool] | Callable[[ReceiverMessageT_co], TypeGuard[FilteredMessageT_co]]

RETURNS DESCRIPTION
Receiver[ReceiverMessageT_co] | Receiver[FilteredMessageT_co]

A new receiver that only receives messages that pass the filter.

Source code in frequenz/channels/_receiver.py
def filter(
    self,
    filter_function: (
        Callable[[ReceiverMessageT_co], bool]
        | Callable[[ReceiverMessageT_co], TypeGuard[FilteredMessageT_co]]
    ),
    /,
) -> Receiver[ReceiverMessageT_co] | Receiver[FilteredMessageT_co]:
    """Apply a filter function on the messages on a receiver.

    Note:
        You can pass a [type guard][typing.TypeGuard] as the filter function to
        narrow the type of the messages that pass the filter.

    Tip:
        The returned receiver type won't have all the methods of the original
        receiver. If you need to access methods of the original receiver that are
        not part of the `Receiver` interface you should save a reference to the
        original receiver and use that instead.

    Args:
        filter_function: The function to be applied on incoming messages to
            determine if they should be received.

    Returns:
        A new receiver that only receives messages that pass the filter.
    """
    return _Filter(receiver=self, filter_function=filter_function)
map ¤

Apply a mapping function on the received message.

Tip

The returned receiver type won't have all the methods of the original receiver. If you need to access methods of the original receiver that are not part of the Receiver interface you should save a reference to the original receiver and use that instead.

PARAMETER DESCRIPTION
mapping_function

The function to be applied on incoming messages.

TYPE: Callable[[ReceiverMessageT_co], MappedMessageT_co]

RETURNS DESCRIPTION
Receiver[MappedMessageT_co]

A new receiver that applies the function on the received messages.

Source code in frequenz/channels/_receiver.py
def map(
    self, mapping_function: Callable[[ReceiverMessageT_co], MappedMessageT_co], /
) -> Receiver[MappedMessageT_co]:
    """Apply a mapping function on the received message.

    Tip:
        The returned receiver type won't have all the methods of the original
        receiver. If you need to access methods of the original receiver that are
        not part of the `Receiver` interface you should save a reference to the
        original receiver and use that instead.

    Args:
        mapping_function: The function to be applied on incoming messages.

    Returns:
        A new receiver that applies the function on the received messages.
    """
    return _Mapper(receiver=self, mapping_function=mapping_function)
ready async ¤
ready() -> bool

Wait until this receiver is ready.

RETURNS DESCRIPTION
bool

Whether this receiver is still running.

Source code in frequenz/channels/event.py
async def ready(self) -> bool:
    """Wait until this receiver is ready.

    Returns:
        Whether this receiver is still running.
    """
    if self._is_stopped:
        return False
    await self._event.wait()
    return not self._is_stopped
receive async ¤
receive() -> ReceiverMessageT_co

Receive a message.

RETURNS DESCRIPTION
ReceiverMessageT_co

The received message.

RAISES DESCRIPTION
ReceiverStoppedError

If there is some problem with the receiver.

ReceiverError

If there is some problem with the receiver.

Source code in frequenz/channels/_receiver.py
async def receive(self) -> ReceiverMessageT_co:  # noqa: DOC503
    """Receive a message.

    Returns:
        The received message.

    Raises:
        ReceiverStoppedError: If there is some problem with the receiver.
        ReceiverError: If there is some problem with the receiver.
    """
    try:
        received = await anext(self)
    except StopAsyncIteration as exc:
        # If we already had a cause and it was the receiver was stopped,
        # then reuse that error, as StopAsyncIteration is just an artifact
        # introduced by __anext__.
        if (
            isinstance(exc.__cause__, ReceiverStoppedError)
            and exc.__cause__.receiver is self
        ):
            # This is a false positive, we are actually checking __cause__ is a
            # ReceiverStoppedError which is an exception.
            raise exc.__cause__  # pylint: disable=raising-non-exception
        raise ReceiverStoppedError(self) from exc
    return received
set ¤
set() -> None

Trigger the event (make the receiver ready).

Source code in frequenz/channels/event.py
def set(self) -> None:
    """Trigger the event (make the receiver ready)."""
    self._is_set = True
    self._event.set()
stop ¤
stop() -> None

Stop this receiver.

Source code in frequenz/channels/event.py
def stop(self) -> None:
    """Stop this receiver."""
    self._is_stopped = True
    self._event.set()
triggered ¤
triggered(
    selected: Selected[Any],
) -> TypeGuard[Selected[ReceiverMessageT_co]]

Check whether this receiver was selected by select().

This method is used in conjunction with the Selected class to determine which receiver was selected in select() iteration.

It also works as a type guard to narrow the type of the Selected instance to the type of the receiver.

Please see select() for an example.

PARAMETER DESCRIPTION
selected

The result of a select() iteration.

TYPE: Selected[Any]

RETURNS DESCRIPTION
TypeGuard[Selected[ReceiverMessageT_co]]

Whether this receiver was selected.

Source code in frequenz/channels/_receiver.py
def triggered(
    self, selected: Selected[Any]
) -> TypeGuard[Selected[ReceiverMessageT_co]]:
    """Check whether this receiver was selected by [`select()`][frequenz.channels.select].

    This method is used in conjunction with the
    [`Selected`][frequenz.channels.Selected] class to determine which receiver was
    selected in `select()` iteration.

    It also works as a [type guard][typing.TypeGuard] to narrow the type of the
    `Selected` instance to the type of the receiver.

    Please see [`select()`][frequenz.channels.select] for an example.

    Args:
        selected: The result of a `select()` iteration.

    Returns:
        Whether this receiver was selected.
    """
    if handled := selected._recv is self:  # pylint: disable=protected-access
        selected._handled = True  # pylint: disable=protected-access
    return handled