Skip to content

bidirectional

frequenz.channels.bidirectional ¤

An abstraction to provide bi-directional communication between actors.

Classes¤

frequenz.channels.bidirectional.Bidirectional ¤

Bases: Generic[T, U]

A wrapper class for simulating bidirectional channels.

Source code in frequenz/channels/bidirectional.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
class Bidirectional(Generic[T, U]):
    """A wrapper class for simulating bidirectional channels."""

    def __init__(self, client_id: str, service_id: str) -> None:
        """Create a `Bidirectional` instance.

        Args:
            client_id: A name for the client, used to name the channels.
            service_id: A name for the service end of the channels.
        """
        self._client_id = client_id
        self._request_channel: Broadcast[T] = Broadcast(f"req_{service_id}_{client_id}")
        self._response_channel: Broadcast[U] = Broadcast(
            f"resp_{service_id}_{client_id}"
        )

        self._client_handle = BidirectionalHandle(
            self._request_channel.get_sender(),
            self._response_channel.get_receiver(),
        )
        self._service_handle = BidirectionalHandle(
            self._response_channel.get_sender(),
            self._request_channel.get_receiver(),
        )

    @property
    def client_handle(self) -> BidirectionalHandle[T, U]:
        """Get a BidirectionalHandle for the client to use.

        Returns:
            Object to send/receive messages with.
        """
        return self._client_handle

    @property
    def service_handle(self) -> BidirectionalHandle[U, T]:
        """Get a `BidirectionalHandle` for the service to use.

        Returns:
            Object to send/receive messages with.
        """
        return self._service_handle
Functions¤
__init__(client_id, service_id) ¤

Create a Bidirectional instance.

PARAMETER DESCRIPTION
client_id

A name for the client, used to name the channels.

TYPE: str

service_id

A name for the service end of the channels.

TYPE: str

Source code in frequenz/channels/bidirectional.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
def __init__(self, client_id: str, service_id: str) -> None:
    """Create a `Bidirectional` instance.

    Args:
        client_id: A name for the client, used to name the channels.
        service_id: A name for the service end of the channels.
    """
    self._client_id = client_id
    self._request_channel: Broadcast[T] = Broadcast(f"req_{service_id}_{client_id}")
    self._response_channel: Broadcast[U] = Broadcast(
        f"resp_{service_id}_{client_id}"
    )

    self._client_handle = BidirectionalHandle(
        self._request_channel.get_sender(),
        self._response_channel.get_receiver(),
    )
    self._service_handle = BidirectionalHandle(
        self._response_channel.get_sender(),
        self._request_channel.get_receiver(),
    )
client_handle() property ¤

Get a BidirectionalHandle for the client to use.

RETURNS DESCRIPTION
BidirectionalHandle[T, U]

Object to send/receive messages with.

Source code in frequenz/channels/bidirectional.py
39
40
41
42
43
44
45
46
@property
def client_handle(self) -> BidirectionalHandle[T, U]:
    """Get a BidirectionalHandle for the client to use.

    Returns:
        Object to send/receive messages with.
    """
    return self._client_handle
service_handle() property ¤

Get a BidirectionalHandle for the service to use.

RETURNS DESCRIPTION
BidirectionalHandle[U, T]

Object to send/receive messages with.

Source code in frequenz/channels/bidirectional.py
48
49
50
51
52
53
54
55
@property
def service_handle(self) -> BidirectionalHandle[U, T]:
    """Get a `BidirectionalHandle` for the service to use.

    Returns:
        Object to send/receive messages with.
    """
    return self._service_handle

frequenz.channels.bidirectional.BidirectionalHandle ¤

Bases: Sender[T], Receiver[U]

A handle to a Bidirectional instance.

It can be used to send/receive values between the client and service.

Source code in frequenz/channels/bidirectional.py
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
class BidirectionalHandle(Sender[T], Receiver[U]):
    """A handle to a [Bidirectional][frequenz.channels.Bidirectional] instance.

    It can be used to send/receive values between the client and service.
    """

    def __init__(self, sender: Sender[T], receiver: Receiver[U]) -> None:
        """Create a `BidirectionalHandle` instance.

        Args:
            sender: A sender to send values with.
            receiver: A receiver to receive values from.
        """
        self._sender = sender
        self._receiver = receiver

    async def send(self, msg: T) -> bool:
        """Send a value to the other side.

        Args:
            msg: The value to send.

        Returns:
            Whether the send was successful or not.
        """
        return await self._sender.send(msg)

    async def receive(self) -> Optional[U]:
        """Receive a value from the other side.

        Returns:
            Received value, or `None` if the channels are closed.
        """
        return await self._receiver.receive()
Functions¤
__init__(sender, receiver) ¤

Create a BidirectionalHandle instance.

PARAMETER DESCRIPTION
sender

A sender to send values with.

TYPE: Sender[T]

receiver

A receiver to receive values from.

TYPE: Receiver[U]

Source code in frequenz/channels/bidirectional.py
64
65
66
67
68
69
70
71
72
def __init__(self, sender: Sender[T], receiver: Receiver[U]) -> None:
    """Create a `BidirectionalHandle` instance.

    Args:
        sender: A sender to send values with.
        receiver: A receiver to receive values from.
    """
    self._sender = sender
    self._receiver = receiver
receive() async ¤

Receive a value from the other side.

RETURNS DESCRIPTION
Optional[U]

Received value, or None if the channels are closed.

Source code in frequenz/channels/bidirectional.py
85
86
87
88
89
90
91
async def receive(self) -> Optional[U]:
    """Receive a value from the other side.

    Returns:
        Received value, or `None` if the channels are closed.
    """
    return await self._receiver.receive()
send(msg) async ¤

Send a value to the other side.

PARAMETER DESCRIPTION
msg

The value to send.

TYPE: T

RETURNS DESCRIPTION
bool

Whether the send was successful or not.

Source code in frequenz/channels/bidirectional.py
74
75
76
77
78
79
80
81
82
83
async def send(self, msg: T) -> bool:
    """Send a value to the other side.

    Args:
        msg: The value to send.

    Returns:
        Whether the send was successful or not.
    """
    return await self._sender.send(msg)