Index
frequenz.dispatch ¤
A highlevel interface for the dispatch API.
A small overview of the most important classes in this module:
- Dispatcher: The entry point for the API.
- Dispatch: A dispatch type with lots of useful extra functionality.
- DispatchManagingActor: An actor to manage other actors based on incoming dispatches.
- Created, Updated, Deleted: Dispatch event types.
Attributes¤
frequenz.dispatch.DispatchEvent
module-attribute
¤
Type that is sent over the channel for dispatch updates.
This type is used to send dispatches that were created, updated or deleted over the channel.
Classes¤
frequenz.dispatch.Created
dataclass
¤
A dispatch created event.
Source code in frequenz/dispatch/_event.py
frequenz.dispatch.Deleted
dataclass
¤
A dispatch deleted event.
Source code in frequenz/dispatch/_event.py
frequenz.dispatch.Dispatch
dataclass
¤
Bases: Dispatch
Dispatch type with extra functionality.
Source code in frequenz/dispatch/_dispatch.py
54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 |
|
Attributes¤
active
instance-attribute
¤
active: bool
Indicates whether the dispatch is active and eligible for processing.
create_time
instance-attribute
¤
create_time: datetime
The creation time of the dispatch in UTC. Set when a dispatch is created.
dry_run
instance-attribute
¤
dry_run: bool
Indicates if the dispatch is a dry run.
Executed for logging and monitoring without affecting actual component states.
duration
instance-attribute
¤
duration: timedelta | None
The duration of the dispatch, represented as a timedelta.
missed_runs
property
¤
Yield all missed runs of a dispatch.
Yields all missed runs of a dispatch.
If a running state change notification was not sent in time due to connection issues, this method will yield all missed runs since the last sent notification.
RETURNS | DESCRIPTION |
---|---|
Iterator[datetime]
|
A generator that yields all missed runs of a dispatch. |
next_run
property
¤
next_run: datetime | None
Calculate the next run of a dispatch.
RETURNS | DESCRIPTION |
---|---|
datetime | None
|
The next run of the dispatch or None if the dispatch is finished. |
payload
instance-attribute
¤
The dispatch payload containing arbitrary data.
It is structured as needed for the dispatch operation.
recurrence
instance-attribute
¤
The recurrence rule for the dispatch.
Defining any repeating patterns or schedules.
running_state_change_synced
class-attribute
instance-attribute
¤
running_state_change_synced: datetime | None = None
The last time a message was sent about the running state change.
started
property
¤
started: bool
Check if the dispatch has started.
A dispatch is considered started if the current time is after the start time but before the end time.
Recurring dispatches are considered started if the current time is after the start time of the last occurrence but before the end time of the last occurrence.
type
instance-attribute
¤
type: str
User-defined information about the type of dispatch.
This is understood and processed by downstream applications.
until
property
¤
until: datetime | None
Time when the dispatch should end.
Returns the time that a running dispatch should end. If the dispatch is not running, None is returned.
RETURNS | DESCRIPTION |
---|---|
datetime | None
|
The time when the dispatch should end or None if the dispatch is not running. |
update_time
instance-attribute
¤
update_time: datetime
The last update time of the dispatch in UTC. Set when a dispatch is modified.
Functions¤
__init__ ¤
__init__(
client_dispatch: Dispatch,
deleted: bool = False,
running_state_change_synced: datetime | None = None,
)
Initialize the dispatch.
PARAMETER | DESCRIPTION |
---|---|
client_dispatch
|
The client dispatch.
TYPE:
|
deleted
|
Whether the dispatch is deleted.
TYPE:
|
running_state_change_synced
|
Timestamp of the last running state change message.
TYPE:
|
Source code in frequenz/dispatch/_dispatch.py
from_protobuf
classmethod
¤
from_protobuf(pb_object: Dispatch) -> Dispatch
Convert a protobuf dispatch to a dispatch.
PARAMETER | DESCRIPTION |
---|---|
pb_object
|
The protobuf dispatch to convert.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
Dispatch
|
The converted dispatch. |
Source code in frequenz/client/dispatch/types.py
next_run_after ¤
Calculate the next run of a dispatch.
PARAMETER | DESCRIPTION |
---|---|
after
|
The time to calculate the next run from.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
datetime | None
|
The next run of the dispatch or None if the dispatch is finished. |
Source code in frequenz/dispatch/_dispatch.py
running ¤
running(type_: str) -> RunningState
Check if the dispatch is currently supposed to be running.
PARAMETER | DESCRIPTION |
---|---|
type_
|
The type of the dispatch that should be running.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
RunningState
|
RUNNING if the dispatch is running, |
RunningState
|
STOPPED if it is stopped, |
RunningState
|
DIFFERENT_TYPE if it is for a different type. |
Source code in frequenz/dispatch/_dispatch.py
to_protobuf ¤
Convert a dispatch to a protobuf dispatch.
RETURNS | DESCRIPTION |
---|---|
Dispatch
|
The converted protobuf dispatch. |
Source code in frequenz/client/dispatch/types.py
frequenz.dispatch.DispatchManagingActor ¤
Bases: Actor
Helper class to manage actors based on dispatches.
Example usage:
import os
import asyncio
from frequenz.dispatch import Dispatcher, DispatchManagingActor, DispatchUpdate
from frequenz.client.dispatch.types import TargetComponents
from frequenz.client.common.microgrid.components import ComponentCategory
from frequenz.channels import Receiver, Broadcast
class MyActor(Actor):
def __init__(self, updates_channel: Receiver[DispatchUpdate]):
super().__init__()
self._updates_channel = updates_channel
self._dry_run: bool
self._options : dict[str, Any]
async def _run(self) -> None:
while True:
update = await self._updates_channel.receive()
print("Received update:", update)
self.set_components(update.components)
self._dry_run = update.dry_run
self._options = update.options
def set_components(self, components: TargetComponents) -> None:
match components:
case [int(), *_] as component_ids:
print("Dispatch: Setting components to %s", components)
case [ComponentCategory.BATTERY, *_]:
print("Dispatch: Using all battery components")
case unsupported:
print(
"Dispatch: Requested an unsupported target component %r, "
"but only component IDs or category BATTERY are supported.",
unsupported,
)
async def run():
url = os.getenv("DISPATCH_API_URL", "grpc://fz-0004.frequenz.io:50051")
key = os.getenv("DISPATCH_API_KEY", "some-key")
microgrid_id = 1
dispatcher = Dispatcher(
microgrid_id=microgrid_id,
server_url=url,
key=key
)
# Create update channel to receive dispatch update events pre-start and mid-run
dispatch_updates_channel = Broadcast[DispatchUpdate](name="dispatch_updates_channel")
# Start actor and give it an dispatch updates channel receiver
my_actor = MyActor(dispatch_updates_channel.new_receiver())
status_receiver = dispatcher.running_status_change.new_receiver()
managing_actor = DispatchManagingActor(
actor=my_actor,
dispatch_type="EXAMPLE",
running_status_receiver=status_receiver,
updates_sender=dispatch_updates_channel.new_sender(),
)
await asyncio.gather(dispatcher.start(), managing_actor.start())
Source code in frequenz/dispatch/_managing_actor.py
33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 |
|
Attributes¤
RESTART_DELAY
class-attribute
instance-attribute
¤
The delay to wait between restarts of this actor.
is_running
property
¤
is_running: bool
Return whether this background service is running.
A service is considered running when at least one task is running.
RETURNS | DESCRIPTION |
---|---|
bool
|
Whether this background service is running. |
name
property
¤
name: str
The name of this background service.
RETURNS | DESCRIPTION |
---|---|
str
|
The name of this background service. |
tasks
property
¤
Return the set of running tasks spawned by this background service.
Users typically should not modify the tasks in the returned set and only use them for informational purposes.
Danger
Changing the returned tasks may lead to unexpected behavior, don't do it unless the class explicitly documents it is safe to do so.
RETURNS | DESCRIPTION |
---|---|
Set[Task[Any]]
|
The set of running tasks spawned by this background service. |
Functions¤
__aenter__
async
¤
__aenter__() -> Self
Enter an async context.
Start this background service.
RETURNS | DESCRIPTION |
---|---|
Self
|
This background service. |
__aexit__
async
¤
__aexit__(
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None,
) -> None
Exit an async context.
Stop this background service.
PARAMETER | DESCRIPTION |
---|---|
exc_type
|
The type of the exception raised, if any.
TYPE:
|
exc_val
|
The exception raised, if any.
TYPE:
|
exc_tb
|
The traceback of the exception raised, if any.
TYPE:
|
Source code in frequenz/sdk/actor/_background_service.py
__await__ ¤
__await__() -> Generator[None, None, None]
Await this background service.
An awaited background service will wait for all its tasks to finish.
RETURNS | DESCRIPTION |
---|---|
None
|
An implementation-specific generator for the awaitable. |
Source code in frequenz/sdk/actor/_background_service.py
__del__ ¤
Destroy this instance.
Cancel all running tasks spawned by this background service.
__init__ ¤
__init__(
actor: Actor | Set[Actor],
dispatch_type: str,
running_status_receiver: Receiver[Dispatch],
updates_sender: Sender[DispatchUpdate] | None = None,
) -> None
Initialize the dispatch handler.
PARAMETER | DESCRIPTION |
---|---|
actor
|
A set of actors or a single actor to manage. |
dispatch_type
|
The type of dispatches to handle.
TYPE:
|
running_status_receiver
|
The receiver for dispatch running status changes. |
updates_sender
|
The sender for dispatch events
TYPE:
|
Source code in frequenz/dispatch/_managing_actor.py
__repr__ ¤
__repr__() -> str
Return a string representation of this instance.
RETURNS | DESCRIPTION |
---|---|
str
|
A string representation of this instance. |
__str__ ¤
__str__() -> str
Return a string representation of this instance.
RETURNS | DESCRIPTION |
---|---|
str
|
A string representation of this instance. |
cancel ¤
cancel(msg: str | None = None) -> None
Cancel all running tasks spawned by this background service.
PARAMETER | DESCRIPTION |
---|---|
msg
|
The message to be passed to the tasks being cancelled.
TYPE:
|
Source code in frequenz/sdk/actor/_background_service.py
start ¤
Start this actor.
If this actor is already running, this method does nothing.
stop
async
¤
stop(msg: str | None = None) -> None
Stop this background service.
This method cancels all running tasks spawned by this service and waits for them to finish.
PARAMETER | DESCRIPTION |
---|---|
msg
|
The message to be passed to the tasks being cancelled.
TYPE:
|
RAISES | DESCRIPTION |
---|---|
BaseExceptionGroup
|
If any of the tasks spawned by this service raised an exception. |
Source code in frequenz/sdk/actor/_background_service.py
wait
async
¤
Wait this background service to finish.
Wait until all background service tasks are finished.
RAISES | DESCRIPTION |
---|---|
BaseExceptionGroup
|
If any of the tasks spawned by this service raised an
exception ( |
Source code in frequenz/sdk/actor/_background_service.py
frequenz.dispatch.DispatchUpdate
dataclass
¤
Event emitted when the dispatch changes.
Source code in frequenz/dispatch/_managing_actor.py
frequenz.dispatch.Dispatcher ¤
A highlevel interface for the dispatch API.
This class provides a highlevel interface to the dispatch API. It provides two channels:
Lifecycle events
A channel that sends a dispatch event message whenever a dispatch is created, updated or deleted.
Running status change
Sends a dispatch message whenever a dispatch is ready to be executed according to the schedule or the running status of the dispatch changed in a way that could potentially require the consumer to start, stop or reconfigure itself.
Processing running state change dispatches
import os
from frequenz.dispatch import Dispatcher, RunningState
from unittest.mock import MagicMock
async def run():
url = os.getenv("DISPATCH_API_URL", "grpc://fz-0004.frequenz.io:50051")
key = os.getenv("DISPATCH_API_KEY", "some-key")
microgrid_id = 1
dispatcher = Dispatcher(
microgrid_id=microgrid_id,
server_url=url,
key=key
)
await dispatcher.start()
actor = MagicMock() # replace with your actor
changed_running_status = dispatcher.running_status_change.new_receiver()
async for dispatch in changed_running_status:
match dispatch.running("DEMO_TYPE"):
case RunningState.RUNNING:
print(f"Executing dispatch {dispatch.id}, due on {dispatch.start_time}")
if actor.is_running:
actor.reconfigure(
components=dispatch.target,
run_parameters=dispatch.payload, # custom actor parameters
dry_run=dispatch.dry_run,
until=dispatch.until,
) # this will reconfigure the actor
else:
# this will start a new actor with the given components
# and run it for the duration of the dispatch
actor.start(
components=dispatch.target,
run_parameters=dispatch.payload, # custom actor parameters
dry_run=dispatch.dry_run,
until=dispatch.until,
)
case RunningState.STOPPED:
actor.stop() # this will stop the actor
case RunningState.DIFFERENT_TYPE:
pass # dispatch not for this type
Getting notification about dispatch lifecycle events
import os
from typing import assert_never
from frequenz.dispatch import Created, Deleted, Dispatcher, Updated
async def run():
url = os.getenv("DISPATCH_API_URL", "grpc://fz-0004.frequenz.io:50051")
key = os.getenv("DISPATCH_API_KEY", "some-key")
microgrid_id = 1
dispatcher = Dispatcher(
microgrid_id=microgrid_id,
server_url=url,
key=key
)
await dispatcher.start() # this will start the actor
events_receiver = dispatcher.lifecycle_events.new_receiver()
async for event in events_receiver:
match event:
case Created(dispatch):
print(f"A dispatch was created: {dispatch}")
case Deleted(dispatch):
print(f"A dispatch was deleted: {dispatch}")
case Updated(dispatch):
print(f"A dispatch was updated: {dispatch}")
case _ as unhandled:
assert_never(unhandled)
Creating a new dispatch and then modifying it.
Note that this uses the lower-level Client
class to create and update the dispatch.
import os
from datetime import datetime, timedelta, timezone
from frequenz.client.common.microgrid.components import ComponentCategory
from frequenz.dispatch import Dispatcher
async def run():
url = os.getenv("DISPATCH_API_URL", "grpc://fz-0004.frequenz.io:50051")
key = os.getenv("DISPATCH_API_KEY", "some-key")
microgrid_id = 1
dispatcher = Dispatcher(
microgrid_id=microgrid_id,
server_url=url,
key=key
)
await dispatcher.start() # this will start the actor
# Create a new dispatch
new_dispatch = await dispatcher.client.create(
microgrid_id=microgrid_id,
type="ECHO_FREQUENCY", # replace with your own type
start_time=datetime.now(tz=timezone.utc) + timedelta(minutes=10),
duration=timedelta(minutes=5),
target=ComponentCategory.INVERTER,
payload={"font": "Times New Roman"}, # Arbitrary payload data
)
# Modify the dispatch
await dispatcher.client.update(
microgrid_id=microgrid_id,
dispatch_id=new_dispatch.id,
new_fields={"duration": timedelta(minutes=10)}
)
# Validate the modification
modified_dispatch = await dispatcher.client.get(
microgrid_id=microgrid_id, dispatch_id=new_dispatch.id
)
assert modified_dispatch.duration == timedelta(minutes=10)
Source code in frequenz/dispatch/_dispatcher.py
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 |
|
Attributes¤
lifecycle_events
property
¤
lifecycle_events: ReceiverFetcher[DispatchEvent]
Return new, updated or deleted dispatches receiver fetcher.
RETURNS | DESCRIPTION |
---|---|
ReceiverFetcher[DispatchEvent]
|
A new receiver for new dispatches. |
running_status_change
property
¤
running_status_change: ReceiverFetcher[Dispatch]
Return running status change receiver fetcher.
This receiver will receive a message whenever the current running status of a dispatch changes.
Usually, one message per scheduled run is to be expected. However, things get complicated when a dispatch was modified:
If it was currently running and the modification now says it should not be running or running with different parameters, then a message will be sent.
In other words: Any change that is expected to make an actor start, stop or reconfigure itself with new parameters causes a message to be sent.
Note: Reaching the end time (start_time + duration) will not send a message, except when it was reached by modifying the duration.
RETURNS | DESCRIPTION |
---|---|
ReceiverFetcher[Dispatch]
|
A new receiver for dispatches whose running status changed. |
Functions¤
__init__ ¤
Initialize the dispatcher.
PARAMETER | DESCRIPTION |
---|---|
microgrid_id
|
The microgrid id.
TYPE:
|
server_url
|
The URL of the dispatch service.
TYPE:
|
key
|
The key to access the service.
TYPE:
|
Source code in frequenz/dispatch/_dispatcher.py
frequenz.dispatch.ReceiverFetcher ¤
Bases: Protocol[ReceivedT_co]
An interface that just exposes a new_receiver
method.
Source code in frequenz/dispatch/_dispatcher.py
Functions¤
new_receiver
abstractmethod
¤
Get a receiver from the channel.
PARAMETER | DESCRIPTION |
---|---|
name
|
A name to identify the receiver in the logs.
TYPE:
|
limit
|
The maximum size of the receiver.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
Receiver[ReceivedT_co]
|
A receiver instance. |
Source code in frequenz/dispatch/_dispatcher.py
frequenz.dispatch.RunningState ¤
frequenz.dispatch.Updated
dataclass
¤
A dispatch updated event.