Skip to content

base_classes

frequenz.channels.base_classes ¤

Baseclasses for Channel Sender and Receiver.

Classes¤

frequenz.channels.base_classes.BufferedReceiver ¤

Bases: Receiver[T]

A channel receiver with a buffer.

Source code in frequenz/channels/base_classes.py
107
108
109
110
111
112
113
114
115
116
class BufferedReceiver(Receiver[T]):
    """A channel receiver with a buffer."""

    @abstractmethod
    def enqueue(self, msg: T) -> None:
        """Put a message into this buffered receiver's queue.

        Args:
            msg: The message to be added to the queue.
        """
Functions¤
enqueue(msg) abstractmethod ¤

Put a message into this buffered receiver's queue.

PARAMETER DESCRIPTION
msg

The message to be added to the queue.

TYPE: T

Source code in frequenz/channels/base_classes.py
110
111
112
113
114
115
116
@abstractmethod
def enqueue(self, msg: T) -> None:
    """Put a message into this buffered receiver's queue.

    Args:
        msg: The message to be added to the queue.
    """

frequenz.channels.base_classes.Peekable ¤

Bases: ABC, Generic[T]

A channel peekable.

A Peekable provides a peek() method that allows the user to get a peek at the latest value in the channel, without consuming anything.

Source code in frequenz/channels/base_classes.py
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
class Peekable(ABC, Generic[T]):
    """A channel peekable.

    A Peekable provides a [peek()][frequenz.channels.Peekable] method that
    allows the user to get a peek at the latest value in the channel, without
    consuming anything.
    """

    @abstractmethod
    def peek(self) -> Optional[T]:
        """Return the latest value that was sent to the channel.

        Returns:
            The latest value received by the channel, and `None`, if nothing
                has been sent to the channel yet.
        """
Functions¤
peek() abstractmethod ¤

Return the latest value that was sent to the channel.

RETURNS DESCRIPTION
Optional[T]

The latest value received by the channel, and None, if nothing has been sent to the channel yet.

Source code in frequenz/channels/base_classes.py
 97
 98
 99
100
101
102
103
104
@abstractmethod
def peek(self) -> Optional[T]:
    """Return the latest value that was sent to the channel.

    Returns:
        The latest value received by the channel, and `None`, if nothing
            has been sent to the channel yet.
    """

frequenz.channels.base_classes.Receiver ¤

Bases: ABC, Generic[T]

A channel Receiver.

Source code in frequenz/channels/base_classes.py
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
class Receiver(ABC, Generic[T]):
    """A channel Receiver."""

    @abstractmethod
    async def receive(self) -> Optional[T]:
        """Receive a message from the channel.

        Returns:
            `None`, if the channel is closed, a message otherwise.
        """

    def __aiter__(self) -> Receiver[T]:
        """Initialize the async iterator over received values.

        Returns:
            `self`, since no extra setup is needed for the iterator.
        """
        return self

    async def __anext__(self) -> T:
        """Await the next value in the async iteration over received values.

        Returns:
            The next value received.

        Raises:
            StopAsyncIteration: if we receive `None`, i.e. if the underlying
                channel is closed.
        """
        received = await self.receive()
        if received is None:
            raise StopAsyncIteration
        return received

    def map(self, call: Callable[[T], U]) -> Receiver[U]:
        """Return a receiver with `call` applied on incoming messages.

        Args:
            call: function to apply on incoming messages.

        Returns:
            A `Receiver` to read results of the given function from.
        """
        return _Map(self, call)

    def into_peekable(self) -> Peekable[T]:
        """Convert the `Receiver` implementation into a `Peekable`.

        Once this function has been called, the receiver will no longer be
        usable, and calling `receive` on the receiver will raise an exception.

        Raises:
            NotImplementedError: when a `Receiver` implementation doesn't have
                a custom `get_peekable` implementation.
        """
        raise NotImplementedError("This receiver does not implement `into_peekable`")
Functions¤
__aiter__() ¤

Initialize the async iterator over received values.

RETURNS DESCRIPTION
Receiver[T]

self, since no extra setup is needed for the iterator.

Source code in frequenz/channels/base_classes.py
42
43
44
45
46
47
48
def __aiter__(self) -> Receiver[T]:
    """Initialize the async iterator over received values.

    Returns:
        `self`, since no extra setup is needed for the iterator.
    """
    return self
__anext__() async ¤

Await the next value in the async iteration over received values.

RETURNS DESCRIPTION
T

The next value received.

RAISES DESCRIPTION
StopAsyncIteration

if we receive None, i.e. if the underlying channel is closed.

Source code in frequenz/channels/base_classes.py
50
51
52
53
54
55
56
57
58
59
60
61
62
63
async def __anext__(self) -> T:
    """Await the next value in the async iteration over received values.

    Returns:
        The next value received.

    Raises:
        StopAsyncIteration: if we receive `None`, i.e. if the underlying
            channel is closed.
    """
    received = await self.receive()
    if received is None:
        raise StopAsyncIteration
    return received
into_peekable() ¤

Convert the Receiver implementation into a Peekable.

Once this function has been called, the receiver will no longer be usable, and calling receive on the receiver will raise an exception.

RAISES DESCRIPTION
NotImplementedError

when a Receiver implementation doesn't have a custom get_peekable implementation.

Source code in frequenz/channels/base_classes.py
76
77
78
79
80
81
82
83
84
85
86
def into_peekable(self) -> Peekable[T]:
    """Convert the `Receiver` implementation into a `Peekable`.

    Once this function has been called, the receiver will no longer be
    usable, and calling `receive` on the receiver will raise an exception.

    Raises:
        NotImplementedError: when a `Receiver` implementation doesn't have
            a custom `get_peekable` implementation.
    """
    raise NotImplementedError("This receiver does not implement `into_peekable`")
map(call) ¤

Return a receiver with call applied on incoming messages.

PARAMETER DESCRIPTION
call

function to apply on incoming messages.

TYPE: Callable[[T], U]

RETURNS DESCRIPTION
Receiver[U]

A Receiver to read results of the given function from.

Source code in frequenz/channels/base_classes.py
65
66
67
68
69
70
71
72
73
74
def map(self, call: Callable[[T], U]) -> Receiver[U]:
    """Return a receiver with `call` applied on incoming messages.

    Args:
        call: function to apply on incoming messages.

    Returns:
        A `Receiver` to read results of the given function from.
    """
    return _Map(self, call)
receive() abstractmethod async ¤

Receive a message from the channel.

RETURNS DESCRIPTION
Optional[T]

None, if the channel is closed, a message otherwise.

Source code in frequenz/channels/base_classes.py
34
35
36
37
38
39
40
@abstractmethod
async def receive(self) -> Optional[T]:
    """Receive a message from the channel.

    Returns:
        `None`, if the channel is closed, a message otherwise.
    """

frequenz.channels.base_classes.Sender ¤

Bases: ABC, Generic[T]

A channel Sender.

Source code in frequenz/channels/base_classes.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class Sender(ABC, Generic[T]):
    """A channel Sender."""

    @abstractmethod
    async def send(self, msg: T) -> bool:
        """Send a message to the channel.

        Args:
            msg: The message to be sent.

        Returns:
            Whether the message was sent, based on whether the channel is open
                or not.
        """
Functions¤
send(msg) abstractmethod async ¤

Send a message to the channel.

PARAMETER DESCRIPTION
msg

The message to be sent.

TYPE: T

RETURNS DESCRIPTION
bool

Whether the message was sent, based on whether the channel is open or not.

Source code in frequenz/channels/base_classes.py
18
19
20
21
22
23
24
25
26
27
28
@abstractmethod
async def send(self, msg: T) -> bool:
    """Send a message to the channel.

    Args:
        msg: The message to be sent.

    Returns:
        Whether the message was sent, based on whether the channel is open
            or not.
    """