Index
frequenz.dispatch ¤
A highlevel interface for the dispatch API.
A small overview of the most important classes in this module:
- Dispatcher: The entry point for the API.
- Dispatch: A dispatch type with lots of useful extra functionality.
- Created, Updated, Deleted: Dispatch event types.
Attributes¤
frequenz.dispatch.DispatchEvent
module-attribute
¤
Type that is sent over the channel for dispatch updates.
This type is used to send dispatches that were created, updated or deleted over the channel.
Classes¤
frequenz.dispatch.Created
dataclass
¤
A dispatch created event.
Source code in frequenz/dispatch/_event.py
frequenz.dispatch.Deleted
dataclass
¤
A dispatch deleted event.
Source code in frequenz/dispatch/_event.py
frequenz.dispatch.Dispatch
dataclass
¤
Bases: Dispatch
Dispatch type with extra functionality.
Source code in frequenz/dispatch/_dispatch.py
54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 |
|
Attributes¤
active
instance-attribute
¤
active: bool
Indicates whether the dispatch is active and eligible for processing.
create_time
instance-attribute
¤
create_time: datetime
The creation time of the dispatch in UTC. Set when a dispatch is created.
dry_run
instance-attribute
¤
dry_run: bool
Indicates if the dispatch is a dry run.
Executed for logging and monitoring without affecting actual component states.
duration
instance-attribute
¤
duration: timedelta
The duration of the dispatch, represented as a timedelta.
microgrid_id
instance-attribute
¤
microgrid_id: int
The identifier of the microgrid to which this dispatch belongs.
missed_runs
property
¤
Yield all missed runs of a dispatch.
Yields all missed runs of a dispatch.
If a running state change notification was not sent in time due to connection issues, this method will yield all missed runs since the last sent notification.
RETURNS | DESCRIPTION |
---|---|
Iterator[datetime]
|
A generator that yields all missed runs of a dispatch. |
next_run
property
¤
next_run: datetime | None
Calculate the next run of a dispatch.
RETURNS | DESCRIPTION |
---|---|
datetime | None
|
The next run of the dispatch or None if the dispatch is finished. |
payload
instance-attribute
¤
The dispatch payload containing arbitrary data.
It is structured as needed for the dispatch operation.
recurrence
instance-attribute
¤
recurrence: RecurrenceRule
The recurrence rule for the dispatch.
Defining any repeating patterns or schedules.
running_state_change_synced
class-attribute
instance-attribute
¤
running_state_change_synced: datetime | None = None
The last time a message was sent about the running state change.
selector
instance-attribute
¤
selector: ComponentSelector
The component selector specifying which components the dispatch targets.
type
instance-attribute
¤
type: str
User-defined information about the type of dispatch.
This is understood and processed by downstream applications.
until
property
¤
until: datetime | None
Time when the dispatch should end.
Returns the time that a running dispatch should end. If the dispatch is not running, None is returned.
RETURNS | DESCRIPTION |
---|---|
datetime | None
|
The time when the dispatch should end or None if the dispatch is not running. |
update_time
instance-attribute
¤
update_time: datetime
The last update time of the dispatch in UTC. Set when a dispatch is modified.
Functions¤
__init__ ¤
__init__(
client_dispatch: Dispatch,
deleted: bool = False,
running_state_change_synced: datetime | None = None,
)
Initialize the dispatch.
PARAMETER | DESCRIPTION |
---|---|
client_dispatch |
The client dispatch.
TYPE:
|
deleted |
Whether the dispatch is deleted.
TYPE:
|
running_state_change_synced |
Timestamp of the last running state change message.
TYPE:
|
Source code in frequenz/dispatch/_dispatch.py
from_protobuf
classmethod
¤
from_protobuf(pb_object: Dispatch) -> Dispatch
Convert a protobuf dispatch to a dispatch.
PARAMETER | DESCRIPTION |
---|---|
pb_object |
The protobuf dispatch to convert.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
Dispatch
|
The converted dispatch. |
Source code in frequenz/client/dispatch/types.py
next_run_after ¤
Calculate the next run of a dispatch.
PARAMETER | DESCRIPTION |
---|---|
after |
The time to calculate the next run from.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
datetime | None
|
The next run of the dispatch or None if the dispatch is finished. |
Source code in frequenz/dispatch/_dispatch.py
running ¤
running(type_: str) -> RunningState
Check if the dispatch is currently supposed to be running.
PARAMETER | DESCRIPTION |
---|---|
type_ |
The type of the dispatch that should be running.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
RunningState
|
RUNNING if the dispatch is running, |
RunningState
|
STOPPED if it is stopped, |
RunningState
|
DIFFERENT_TYPE if it is for a different type. |
Source code in frequenz/dispatch/_dispatch.py
to_protobuf ¤
Convert a dispatch to a protobuf dispatch.
RETURNS | DESCRIPTION |
---|---|
Dispatch
|
The converted protobuf dispatch. |
Source code in frequenz/client/dispatch/types.py
frequenz.dispatch.Dispatcher ¤
A highlevel interface for the dispatch API.
This class provides a highlevel interface to the dispatch API. It provides two channels:
Lifecycle events
A channel that sends a dispatch event message whenever a dispatch is created, updated or deleted.
Running status change
Sends a dispatch message whenever a dispatch is ready to be executed according to the schedule or the running status of the dispatch changed in a way that could potentially require the consumer to start, stop or reconfigure itself.
Processing running state change dispatches
import os
import grpc.aio
from frequenz.dispatch import Dispatcher, RunningState
from unittest.mock import MagicMock
async def run():
host = os.getenv("DISPATCH_API_HOST", "localhost")
port = os.getenv("DISPATCH_API_PORT", "50051")
service_address = f"{host}:{port}"
grpc_channel = grpc.aio.insecure_channel(service_address)
microgrid_id = 1
dispatcher = Dispatcher(microgrid_id, grpc_channel, service_address)
await dispatcher.start()
actor = MagicMock() # replace with your actor
changed_running_status = dispatcher.running_status_change.new_receiver()
async for dispatch in changed_running_status:
match dispatch.running("DEMO_TYPE"):
case RunningState.RUNNING:
print(f"Executing dispatch {dispatch.id}, due on {dispatch.start_time}")
if actor.is_running:
actor.reconfigure(
components=dispatch.selector,
run_parameters=dispatch.payload, # custom actor parameters
dry_run=dispatch.dry_run,
until=dispatch.until,
) # this will reconfigure the actor
else:
# this will start a new actor with the given components
# and run it for the duration of the dispatch
actor.start(
components=dispatch.selector,
run_parameters=dispatch.payload, # custom actor parameters
dry_run=dispatch.dry_run,
until=dispatch.until,
)
case RunningState.STOPPED:
actor.stop() # this will stop the actor
case RunningState.DIFFERENT_TYPE:
pass # dispatch not for this type
Getting notification about dispatch lifecycle events
import os
from typing import assert_never
import grpc.aio
from frequenz.dispatch import Created, Deleted, Dispatcher, Updated
async def run():
host = os.getenv("DISPATCH_API_HOST", "localhost")
port = os.getenv("DISPATCH_API_PORT", "50051")
service_address = f"{host}:{port}"
grpc_channel = grpc.aio.insecure_channel(service_address)
microgrid_id = 1
dispatcher = Dispatcher(microgrid_id, grpc_channel, service_address)
dispatcher.start() # this will start the actor
events_receiver = dispatcher.lifecycle_events.new_receiver()
async for event in events_receiver:
match event:
case Created(dispatch):
print(f"A dispatch was created: {dispatch}")
case Deleted(dispatch):
print(f"A dispatch was deleted: {dispatch}")
case Updated(dispatch):
print(f"A dispatch was updated: {dispatch}")
case _ as unhandled:
assert_never(unhandled)
Creating a new dispatch and then modifying it.
Note that this uses the lower-level Client
class to create and update the dispatch.
import os
from datetime import datetime, timedelta, timezone
import grpc.aio
from frequenz.client.common.microgrid.components import ComponentCategory
from frequenz.dispatch import Dispatcher
async def run():
host = os.getenv("DISPATCH_API_HOST", "localhost")
port = os.getenv("DISPATCH_API_PORT", "50051")
service_address = f"{host}:{port}"
grpc_channel = grpc.aio.insecure_channel(service_address)
microgrid_id = 1
dispatcher = Dispatcher(microgrid_id, grpc_channel, service_address)
await dispatcher.start() # this will start the actor
# Create a new dispatch
new_dispatch = await dispatcher.client.create(
microgrid_id=microgrid_id,
_type="ECHO_FREQUENCY", # replace with your own type
start_time=datetime.now(tz=timezone.utc) + timedelta(minutes=10),
duration=timedelta(minutes=5),
selector=ComponentCategory.INVERTER,
payload={"font": "Times New Roman"}, # Arbitrary payload data
)
# Modify the dispatch
await dispatcher.client.update(
dispatch_id=new_dispatch.id, new_fields={"duration": timedelta(minutes=10)}
)
# Validate the modification
modified_dispatch = await dispatcher.client.get(new_dispatch.id)
assert modified_dispatch.duration == timedelta(minutes=10)
Source code in frequenz/dispatch/_dispatcher.py
39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 |
|
Attributes¤
lifecycle_events
property
¤
lifecycle_events: ReceiverFetcher[DispatchEvent]
Return new, updated or deleted dispatches receiver fetcher.
RETURNS | DESCRIPTION |
---|---|
ReceiverFetcher[DispatchEvent]
|
A new receiver for new dispatches. |
running_status_change
property
¤
running_status_change: ReceiverFetcher[Dispatch]
Return running status change receiver fetcher.
This receiver will receive a message whenever the current running status of a dispatch changes.
Usually, one message per scheduled run is to be expected. However, things get complicated when a dispatch was modified:
If it was currently running and the modification now says it should not be running or running with different parameters, then a message will be sent.
In other words: Any change that is expected to make an actor start, stop or reconfigure itself with new parameters causes a message to be sent.
Note: Reaching the end time (start_time + duration) will not send a message, except when it was reached by modifying the duration.
RETURNS | DESCRIPTION |
---|---|
ReceiverFetcher[Dispatch]
|
A new receiver for dispatches whose running status changed. |
Functions¤
__init__ ¤
Initialize the dispatcher.
PARAMETER | DESCRIPTION |
---|---|
microgrid_id |
The microgrid id.
TYPE:
|
grpc_channel |
The gRPC channel.
TYPE:
|
svc_addr |
The service address.
TYPE:
|
Source code in frequenz/dispatch/_dispatcher.py
frequenz.dispatch.ReceiverFetcher ¤
Bases: Protocol[ReceivedT_co]
An interface that just exposes a new_receiver
method.
Source code in frequenz/dispatch/_dispatcher.py
Functions¤
new_receiver
abstractmethod
¤
Get a receiver from the channel.
PARAMETER | DESCRIPTION |
---|---|
name |
A name to identify the receiver in the logs.
TYPE:
|
limit |
The maximum size of the receiver.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
Receiver[ReceivedT_co]
|
A receiver instance. |
Source code in frequenz/dispatch/_dispatcher.py
frequenz.dispatch.RunningState ¤
frequenz.dispatch.Updated
dataclass
¤
A dispatch updated event.