Skip to content

client

frequenz.client.dispatch.test.client ¤

Fake client for testing.

Attributes¤

frequenz.client.dispatch.test.client.ALL_KEY module-attribute ¤

ALL_KEY = 'all'

Key that has access to all resources in the FakeService.

frequenz.client.dispatch.test.client.NONE_KEY module-attribute ¤

NONE_KEY = 'none'

Key that has no access to any resources in the FakeService.

Classes¤

frequenz.client.dispatch.test.client.FakeClient ¤

Bases: Client

Fake client for testing.

This client uses a fake service to simulate the dispatch api.

Source code in frequenz/client/dispatch/test/client.py
class FakeClient(Client):
    """Fake client for testing.

    This client uses a fake service to simulate the dispatch api.
    """

    def __init__(
        self,
    ) -> None:
        """Initialize the mock client."""
        super().__init__(server_url="mock", key=ALL_KEY, connect=False)
        self._stuba: FakeService = FakeService()

    @property
    def stub(self) -> FakeService:  # type: ignore
        """The fake service.

        Returns:
            FakeService: The fake service.
        """
        return self._stuba

    def dispatches(self, microgrid_id: int) -> list[Dispatch]:
        """List of dispatches.

        Args:
            microgrid_id: The microgrid id.

        Returns:
            list[Dispatch]: The list of dispatches
        """
        return self._service.dispatches.get(microgrid_id, [])

    def set_dispatches(self, microgrid_id: int, value: list[Dispatch]) -> None:
        """Set the list of dispatches.

        Args:
            microgrid_id: The microgrid id.
            value: The list of dispatches to set.
        """
        self._service.dispatches[microgrid_id] = value

    @property
    def _service(self) -> FakeService:
        """The fake service.

        Returns:
            FakeService: The fake service.
        """
        return self._stuba
Attributes¤
channel property ¤
channel: Channel

The underlying gRPC channel used to communicate with the server.

Warning

This channel is provided as a last resort for advanced users. It is not recommended to use this property directly unless you know what you are doing and you don't care about being tied to a specific gRPC library.

RAISES DESCRIPTION
ClientNotConnected

If the client is not connected to the server.

channel_defaults property ¤
channel_defaults: ChannelOptions

The default options for the gRPC channel.

is_connected property ¤
is_connected: bool

Whether the client is connected to the server.

server_url property ¤
server_url: str

The URL of the server.

streams class-attribute instance-attribute ¤
streams: dict[
    int,
    GrpcStreamBroadcaster[
        StreamMicrogridDispatchesResponse, DispatchEvent
    ],
] = {}

A dictionary of streamers, keyed by microgrid_id.

stub property ¤
stub: FakeService

The fake service.

RETURNS DESCRIPTION
FakeService

The fake service.

TYPE: FakeService

Functions¤
__aenter__ async ¤
__aenter__() -> Self

Enter a context manager.

Source code in frequenz/client/base/client.py
async def __aenter__(self) -> Self:
    """Enter a context manager."""
    self.connect()
    return self
__aexit__ async ¤
__aexit__(
    _exc_type: type[BaseException] | None,
    _exc_val: BaseException | None,
    _exc_tb: Any | None,
) -> bool | None

Exit a context manager.

Source code in frequenz/client/base/client.py
async def __aexit__(
    self,
    _exc_type: type[BaseException] | None,
    _exc_val: BaseException | None,
    _exc_tb: Any | None,
) -> bool | None:
    """Exit a context manager."""
    if self._channel is None:
        return None
    result = await self._channel.__aexit__(_exc_type, _exc_val, _exc_tb)
    self._channel = None
    self._stub = None
    return result
__init__ ¤
__init__() -> None

Initialize the mock client.

Source code in frequenz/client/dispatch/test/client.py
def __init__(
    self,
) -> None:
    """Initialize the mock client."""
    super().__init__(server_url="mock", key=ALL_KEY, connect=False)
    self._stuba: FakeService = FakeService()
connect ¤
connect(server_url: str | None = None) -> None

Connect to the server, possibly using a new URL.

If the client is already connected and the URL is the same as the previous URL, this method does nothing. If you want to force a reconnection, you can call disconnect() first.

PARAMETER DESCRIPTION
server_url

The URL of the server to connect to. If not provided, the previously used URL is used.

TYPE: str | None DEFAULT: None

Source code in frequenz/client/base/client.py
def connect(self, server_url: str | None = None) -> None:
    """Connect to the server, possibly using a new URL.

    If the client is already connected and the URL is the same as the previous URL,
    this method does nothing. If you want to force a reconnection, you can call
    [disconnect()][frequenz.client.base.client.BaseApiClient.disconnect] first.

    Args:
        server_url: The URL of the server to connect to. If not provided, the
            previously used URL is used.
    """
    if server_url is not None and server_url != self._server_url:  # URL changed
        self._server_url = server_url
    elif self.is_connected:
        return
    self._channel = parse_grpc_uri(self._server_url, self._channel_defaults)
    self._stub = self._create_stub(self._channel)
create async ¤
create(
    microgrid_id: int,
    type: str,
    start_time: datetime,
    duration: timedelta | None,
    selector: ComponentSelector,
    *,
    active: bool = True,
    dry_run: bool = False,
    payload: dict[str, Any] | None = None,
    recurrence: RecurrenceRule | None = None
) -> Dispatch

Create a dispatch.

Will try to return the created dispatch, identifying it by the same fields as the request.

PARAMETER DESCRIPTION
microgrid_id

The microgrid_id to create the dispatch for.

TYPE: int

type

User defined string to identify the dispatch type.

TYPE: str

start_time

The start time of the dispatch.

TYPE: datetime

duration

The duration of the dispatch. Can be None for infinite or no-duration dispatches (e.g. switching a component on).

TYPE: timedelta | None

selector

The component selector for the dispatch.

TYPE: ComponentSelector

active

The active status of the dispatch.

TYPE: bool DEFAULT: True

dry_run

The dry_run status of the dispatch.

TYPE: bool DEFAULT: False

payload

The payload of the dispatch.

TYPE: dict[str, Any] | None DEFAULT: None

recurrence

The recurrence rule of the dispatch.

TYPE: RecurrenceRule | None DEFAULT: None

RETURNS DESCRIPTION
Dispatch

The created dispatch

TYPE: Dispatch

RAISES DESCRIPTION
ValueError

If start_time is in the past.

Source code in frequenz/client/dispatch/_client.py
async def create(  # pylint: disable=too-many-positional-arguments
    self,
    microgrid_id: int,
    type: str,  # pylint: disable=redefined-builtin
    start_time: datetime,
    duration: timedelta | None,
    selector: ComponentSelector,
    *,
    active: bool = True,
    dry_run: bool = False,
    payload: dict[str, Any] | None = None,
    recurrence: RecurrenceRule | None = None,
) -> Dispatch:
    """Create a dispatch.

    Will try to return the created dispatch, identifying it by
    the same fields as the request.

    Args:
        microgrid_id: The microgrid_id to create the dispatch for.
        type: User defined string to identify the dispatch type.
        start_time: The start time of the dispatch.
        duration: The duration of the dispatch. Can be `None` for infinite
            or no-duration dispatches (e.g. switching a component on).
        selector: The component selector for the dispatch.
        active: The active status of the dispatch.
        dry_run: The dry_run status of the dispatch.
        payload: The payload of the dispatch.
        recurrence: The recurrence rule of the dispatch.

    Returns:
        Dispatch: The created dispatch

    Raises:
        ValueError: If start_time is in the past.
    """
    if start_time <= datetime.now(tz=start_time.tzinfo):
        raise ValueError("start_time must not be in the past")

    # Raise if it's not UTC
    if start_time.tzinfo is None or start_time.tzinfo.utcoffset(start_time) is None:
        raise ValueError("start_time must be timezone aware")

    request = DispatchCreateRequest(
        microgrid_id=microgrid_id,
        type=type,
        start_time=start_time,
        duration=duration,
        selector=selector,
        active=active,
        dry_run=dry_run,
        payload=payload or {},
        recurrence=recurrence,
    )

    response = await cast(
        Awaitable[CreateMicrogridDispatchResponse],
        self.stub.CreateMicrogridDispatch(
            request.to_protobuf(), metadata=self._metadata
        ),
    )

    return Dispatch.from_protobuf(response.dispatch)
delete async ¤
delete(*, microgrid_id: int, dispatch_id: int) -> None

Delete a dispatch.

PARAMETER DESCRIPTION
microgrid_id

The microgrid_id to delete the dispatch for.

TYPE: int

dispatch_id

The dispatch_id to delete.

TYPE: int

Source code in frequenz/client/dispatch/_client.py
async def delete(self, *, microgrid_id: int, dispatch_id: int) -> None:
    """Delete a dispatch.

    Args:
        microgrid_id: The microgrid_id to delete the dispatch for.
        dispatch_id: The dispatch_id to delete.
    """
    request = DeleteMicrogridDispatchRequest(
        dispatch_id=dispatch_id, microgrid_id=microgrid_id
    )
    await cast(
        Awaitable[None],
        self.stub.DeleteMicrogridDispatch(request, metadata=self._metadata),
    )
disconnect async ¤
disconnect() -> None

Disconnect from the server.

If the client is not connected, this method does nothing.

Source code in frequenz/client/base/client.py
async def disconnect(self) -> None:
    """Disconnect from the server.

    If the client is not connected, this method does nothing.
    """
    await self.__aexit__(None, None, None)
dispatches ¤
dispatches(microgrid_id: int) -> list[Dispatch]

List of dispatches.

PARAMETER DESCRIPTION
microgrid_id

The microgrid id.

TYPE: int

RETURNS DESCRIPTION
list[Dispatch]

list[Dispatch]: The list of dispatches

Source code in frequenz/client/dispatch/test/client.py
def dispatches(self, microgrid_id: int) -> list[Dispatch]:
    """List of dispatches.

    Args:
        microgrid_id: The microgrid id.

    Returns:
        list[Dispatch]: The list of dispatches
    """
    return self._service.dispatches.get(microgrid_id, [])
get async ¤
get(*, microgrid_id: int, dispatch_id: int) -> Dispatch

Get a dispatch.

PARAMETER DESCRIPTION
microgrid_id

The microgrid_id to get the dispatch for.

TYPE: int

dispatch_id

The dispatch_id to get.

TYPE: int

RETURNS DESCRIPTION
Dispatch

The dispatch.

TYPE: Dispatch

Source code in frequenz/client/dispatch/_client.py
async def get(self, *, microgrid_id: int, dispatch_id: int) -> Dispatch:
    """Get a dispatch.

    Args:
        microgrid_id: The microgrid_id to get the dispatch for.
        dispatch_id: The dispatch_id to get.

    Returns:
        Dispatch: The dispatch.
    """
    request = GetMicrogridDispatchRequest(
        dispatch_id=dispatch_id, microgrid_id=microgrid_id
    )
    response = await cast(
        Awaitable[GetMicrogridDispatchResponse],
        self.stub.GetMicrogridDispatch(request, metadata=self._metadata),
    )
    return Dispatch.from_protobuf(response.dispatch)
list async ¤
list(
    microgrid_id: int,
    *,
    component_selectors: Iterator[ComponentSelector] = iter(
        ()
    ),
    start_from: datetime | None = None,
    start_to: datetime | None = None,
    end_from: datetime | None = None,
    end_to: datetime | None = None,
    active: bool | None = None,
    dry_run: bool | None = None,
    page_size: int | None = None
) -> AsyncIterator[Iterator[Dispatch]]

List dispatches.

This function handles pagination internally and returns an async iterator over the dispatches. Pagination parameters like page_size and page_token can be used, but they are mutually exclusive.

Example usage:

client = Client(key="key", server_url="grpc://fz-0004.frequenz.io")
async for page in client.list(microgrid_id=1):
    for dispatch in page:
        print(dispatch)
PARAMETER DESCRIPTION
microgrid_id

The microgrid_id to list dispatches for.

TYPE: int

component_selectors

optional, list of component ids or categories to filter by.

TYPE: Iterator[ComponentSelector] DEFAULT: iter(())

start_from

optional, filter by start_time >= start_from.

TYPE: datetime | None DEFAULT: None

start_to

optional, filter by start_time < start_to.

TYPE: datetime | None DEFAULT: None

end_from

optional, filter by end_time >= end_from.

TYPE: datetime | None DEFAULT: None

end_to

optional, filter by end_time < end_to.

TYPE: datetime | None DEFAULT: None

active

optional, filter by active status.

TYPE: bool | None DEFAULT: None

dry_run

optional, filter by dry_run status.

TYPE: bool | None DEFAULT: None

page_size

optional, number of dispatches to return per page.

TYPE: int | None DEFAULT: None

RETURNS DESCRIPTION
AsyncIterator[Iterator[Dispatch]]

An async iterator over pages of dispatches.

YIELDS DESCRIPTION
AsyncIterator[Iterator[Dispatch]]

A page of dispatches over which you can lazily iterate.

Source code in frequenz/client/dispatch/_client.py
async def list(
    self,
    microgrid_id: int,
    *,
    component_selectors: Iterator[ComponentSelector] = iter(()),
    start_from: datetime | None = None,
    start_to: datetime | None = None,
    end_from: datetime | None = None,
    end_to: datetime | None = None,
    active: bool | None = None,
    dry_run: bool | None = None,
    page_size: int | None = None,
) -> AsyncIterator[Iterator[Dispatch]]:
    """List dispatches.

    This function handles pagination internally and returns an async iterator
    over the dispatches. Pagination parameters like `page_size` and `page_token`
    can be used, but they are mutually exclusive.

    Example usage:

    ```python
    client = Client(key="key", server_url="grpc://fz-0004.frequenz.io")
    async for page in client.list(microgrid_id=1):
        for dispatch in page:
            print(dispatch)
    ```

    Args:
        microgrid_id: The microgrid_id to list dispatches for.
        component_selectors: optional, list of component ids or categories to filter by.
        start_from: optional, filter by start_time >= start_from.
        start_to: optional, filter by start_time < start_to.
        end_from: optional, filter by end_time >= end_from.
        end_to: optional, filter by end_time < end_to.
        active: optional, filter by active status.
        dry_run: optional, filter by dry_run status.
        page_size: optional, number of dispatches to return per page.

    Returns:
        An async iterator over pages of dispatches.

    Yields:
        A page of dispatches over which you can lazily iterate.
    """

    def to_interval(
        from_: datetime | None, to: datetime | None
    ) -> PBTimeIntervalFilter | None:
        return (
            PBTimeIntervalFilter(
                **{"from": to_timestamp(from_)}, to=to_timestamp(to)
            )
            if from_ or to
            else None
        )

    # Setup parameters
    start_time_interval = to_interval(start_from, start_to)
    end_time_interval = to_interval(end_from, end_to)
    selectors = list(map(component_selector_to_protobuf, component_selectors))
    filters = DispatchFilter(
        selectors=selectors,
        start_time_interval=start_time_interval,
        end_time_interval=end_time_interval,
        is_active=active,
        is_dry_run=dry_run,
    )

    request = ListMicrogridDispatchesRequest(
        microgrid_id=microgrid_id,
        filter=filters,
        pagination_params=(
            PaginationParams(page_size=page_size) if page_size else None
        ),
    )

    while True:
        response = await cast(
            Awaitable[ListMicrogridDispatchesResponse],
            self.stub.ListMicrogridDispatches(request, metadata=self._metadata),
        )

        yield (Dispatch.from_protobuf(dispatch) for dispatch in response.dispatches)

        if len(response.pagination_info.next_page_token):
            request.pagination_params.CopyFrom(
                PaginationParams(
                    page_token=response.pagination_info.next_page_token
                )
            )
        else:
            break
set_dispatches ¤
set_dispatches(
    microgrid_id: int, value: list[Dispatch]
) -> None

Set the list of dispatches.

PARAMETER DESCRIPTION
microgrid_id

The microgrid id.

TYPE: int

value

The list of dispatches to set.

TYPE: list[Dispatch]

Source code in frequenz/client/dispatch/test/client.py
def set_dispatches(self, microgrid_id: int, value: list[Dispatch]) -> None:
    """Set the list of dispatches.

    Args:
        microgrid_id: The microgrid id.
        value: The list of dispatches to set.
    """
    self._service.dispatches[microgrid_id] = value
stream ¤
stream(microgrid_id: int) -> Receiver[DispatchEvent]

Receive a stream of dispatch events.

This function returns a receiver channel that can be used to receive dispatch events. An event is one of [CREATE, UPDATE, DELETE].

Example usage:

client = Client(key="key", server_url="grpc://fz-0004.frequenz.io")
async for message in client.stream(microgrid_id=1):
    print(message.event, message.dispatch)
PARAMETER DESCRIPTION
microgrid_id

The microgrid_id to receive dispatches for.

TYPE: int

RETURNS DESCRIPTION
Receiver[DispatchEvent]

A receiver channel to receive the stream of dispatch events.

Source code in frequenz/client/dispatch/_client.py
def stream(self, microgrid_id: int) -> channels.Receiver[DispatchEvent]:
    """Receive a stream of dispatch events.

    This function returns a receiver channel that can be used to receive
    dispatch events.
    An event is one of [CREATE, UPDATE, DELETE].

    Example usage:

    ```
    client = Client(key="key", server_url="grpc://fz-0004.frequenz.io")
    async for message in client.stream(microgrid_id=1):
        print(message.event, message.dispatch)
    ```

    Args:
        microgrid_id: The microgrid_id to receive dispatches for.

    Returns:
        A receiver channel to receive the stream of dispatch events.
    """
    return self._get_stream(microgrid_id).new_receiver()
update async ¤
update(
    *,
    microgrid_id: int,
    dispatch_id: int,
    new_fields: dict[str, Any]
) -> Dispatch

Update a dispatch.

The new_fields argument is a dictionary of fields to update. The keys are the field names, and the values are the new values for the fields.

For recurrence fields, the keys are preceeded by "recurrence.".

Note that updating type and dry_run is not supported.

PARAMETER DESCRIPTION
microgrid_id

The microgrid_id to update the dispatch for.

TYPE: int

dispatch_id

The dispatch_id to update.

TYPE: int

new_fields

The fields to update.

TYPE: dict[str, Any]

RETURNS DESCRIPTION
Dispatch

The updated dispatch.

TYPE: Dispatch

RAISES DESCRIPTION
ValueError

If updating type or dry_run.

Source code in frequenz/client/dispatch/_client.py
async def update(
    self,
    *,
    microgrid_id: int,
    dispatch_id: int,
    new_fields: dict[str, Any],
) -> Dispatch:
    """Update a dispatch.

    The `new_fields` argument is a dictionary of fields to update. The keys are
    the field names, and the values are the new values for the fields.

    For recurrence fields, the keys are preceeded by "recurrence.".

    Note that updating `type` and `dry_run` is not supported.

    Args:
        microgrid_id: The microgrid_id to update the dispatch for.
        dispatch_id: The dispatch_id to update.
        new_fields: The fields to update.

    Returns:
        Dispatch: The updated dispatch.

    Raises:
        ValueError: If updating `type` or `dry_run`.
    """
    msg = UpdateMicrogridDispatchRequest(
        dispatch_id=dispatch_id, microgrid_id=microgrid_id
    )

    for key, val in new_fields.items():
        path = key.split(".")

        match path[0]:
            case "start_time":
                msg.update.start_time.CopyFrom(to_timestamp(val))
            case "duration":
                if val is None:
                    msg.update.ClearField("duration")
                else:
                    msg.update.duration = round(val.total_seconds())
            case "selector":
                msg.update.selector.CopyFrom(component_selector_to_protobuf(val))
            case "is_active":
                msg.update.is_active = val
            case "payload":
                msg.update.payload.update(val)
            case "active":
                msg.update.is_active = val
                key = "is_active"
            case "recurrence":
                match path[1]:
                    case "freq":
                        msg.update.recurrence.freq = val
                    # Proto uses "freq" instead of "frequency"
                    case "frequency":
                        msg.update.recurrence.freq = val
                        # Correct the key to "recurrence.freq"
                        key = "recurrence.freq"
                    case "interval":
                        msg.update.recurrence.interval = val
                    case "end_criteria":
                        msg.update.recurrence.end_criteria.CopyFrom(
                            val.to_protobuf()
                        )
                    case "byminutes":
                        msg.update.recurrence.byminutes.extend(val)
                    case "byhours":
                        msg.update.recurrence.byhours.extend(val)
                    case "byweekdays":
                        msg.update.recurrence.byweekdays.extend(val)
                    case "bymonthdays":
                        msg.update.recurrence.bymonthdays.extend(val)
                    case "bymonths":
                        msg.update.recurrence.bymonths.extend(val)
                    case _:
                        raise ValueError(f"Unknown recurrence field: {path[1]}")
            case _:
                raise ValueError(f"Unknown field: {path[0]}")

        msg.update_mask.paths.append(key)

    response = await cast(
        Awaitable[UpdateMicrogridDispatchResponse],
        self.stub.UpdateMicrogridDispatch(msg, metadata=self._metadata),
    )

    return Dispatch.from_protobuf(response.dispatch)

Functions¤

frequenz.client.dispatch.test.client.to_create_params ¤

to_create_params(
    microgrid_id: int, dispatch: Dispatch
) -> dict[str, Any]

Convert a dispatch to client.create parameters.

PARAMETER DESCRIPTION
microgrid_id

The microgrid id.

TYPE: int

dispatch

The dispatch to convert.

TYPE: Dispatch

RETURNS DESCRIPTION
dict[str, Any]

dict[str, Any]: The create parameters.

Source code in frequenz/client/dispatch/test/client.py
def to_create_params(microgrid_id: int, dispatch: Dispatch) -> dict[str, Any]:
    """Convert a dispatch to client.create parameters.

    Args:
        microgrid_id: The microgrid id.
        dispatch: The dispatch to convert.

    Returns:
        dict[str, Any]: The create parameters.
    """
    return {
        "microgrid_id": microgrid_id,
        "type": dispatch.type,
        "start_time": dispatch.start_time,
        "duration": dispatch.duration,
        "selector": dispatch.selector,
        "active": dispatch.active,
        "dry_run": dispatch.dry_run,
        "payload": dispatch.payload,
        "recurrence": dispatch.recurrence,
    }