dispatch
frequenz.dispatch ¤
A highlevel interface for the dispatch API.
A small overview of the most important classes in this module:
- Dispatcher: The entry point for the API.
- Dispatch: A dispatch type with lots of useful extra functionality.
- ActorDispatcher: A service to manage other actors based on incoming dispatches.
- Created, Updated, Deleted: Dispatch event types.
Attributes¤
frequenz.dispatch.DispatchEvent
module-attribute
¤
Type that is sent over the channel for dispatch updates.
This type is used to send dispatches that were created, updated or deleted over the channel.
Classes¤
frequenz.dispatch.ActorDispatcher ¤
Bases: BackgroundService
Helper class to manage actors based on dispatches.
Example usage:
import os
import asyncio
from typing import override
from frequenz.dispatch import Dispatcher, ActorDispatcher, DispatchInfo
from frequenz.client.dispatch.types import TargetComponents
from frequenz.client.common.microgrid.components import ComponentCategory
from frequenz.channels import Receiver, Broadcast, select, selected_from
from frequenz.sdk.actor import Actor, run
class MyActor(Actor):
def __init__(
self,
*,
name: str | None = None,
) -> None:
super().__init__(name=name)
self._dispatch_updates_receiver: Receiver[DispatchInfo] | None = None
self._dry_run: bool = False
self._options: dict[str, Any] = {}
@classmethod
def new_with_dispatch(
cls,
initial_dispatch: DispatchInfo,
dispatch_updates_receiver: Receiver[DispatchInfo],
*,
name: str | None = None,
) -> "Self":
self = cls(name=name)
self._dispatch_updates_receiver = dispatch_updates_receiver
self._update_dispatch_information(initial_dispatch)
return self
@override
async def _run(self) -> None:
other_recv: Receiver[Any] = ...
if self._dispatch_updates_receiver is None:
async for msg in other_recv:
# do stuff
...
else:
await self._run_with_dispatch(other_recv)
async def _run_with_dispatch(self, other_recv: Receiver[Any]) -> None:
async for selected in select(self._dispatch_updates_receiver, other_recv):
if selected_from(selected, self._dispatch_updates_receiver):
self._update_dispatch_information(selected.message)
elif selected_from(selected, other_recv):
# do stuff
...
else:
assert False, f"Unexpected selected receiver: {selected}"
def _update_dispatch_information(self, dispatch_update: DispatchInfo) -> None:
print("Received update:", dispatch_update)
self._dry_run = dispatch_update.dry_run
self._options = dispatch_update.options
match dispatch_update.components:
case []:
print("Dispatch: Using all components")
case list() as ids if isinstance(ids[0], int):
component_ids = ids
case [ComponentCategory.BATTERY, *_]:
component_category = ComponentCategory.BATTERY
case unsupported:
print(
"Dispatch: Requested an unsupported selector %r, "
"but only component IDs or category BATTERY are supported.",
unsupported,
)
async def main():
url = os.getenv("DISPATCH_API_URL", "grpc://fz-0004.frequenz.io:50051")
key = os.getenv("DISPATCH_API_KEY", "some-key")
microgrid_id = 1
async with Dispatcher(
microgrid_id=microgrid_id,
server_url=url,
key=key
) as dispatcher:
status_receiver = dispatcher.new_running_state_event_receiver("EXAMPLE_TYPE")
managing_actor = ActorDispatcher(
actor_factory=MyActor.new_with_dispatch,
running_status_receiver=status_receiver,
)
await run(managing_actor)
Source code in frequenz/dispatch/_actor_dispatcher.py
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 |
|
Attributes¤
is_running
property
¤
is_running: bool
Return whether this background service is running.
A service is considered running when at least one task is running.
RETURNS | DESCRIPTION |
---|---|
bool
|
Whether this background service is running. |
name
property
¤
name: str
The name of this background service.
RETURNS | DESCRIPTION |
---|---|
str
|
The name of this background service. |
tasks
property
¤
Return the set of running tasks spawned by this background service.
Users typically should not modify the tasks in the returned set and only use them for informational purposes.
Danger
Changing the returned tasks may lead to unexpected behavior, don't do it unless the class explicitly documents it is safe to do so.
RETURNS | DESCRIPTION |
---|---|
Set[Task[Any]]
|
The set of running tasks spawned by this background service. |
Classes¤
FailedDispatchesRetrier ¤
Bases: BackgroundService
Manages the retring of failed dispatches.
Source code in frequenz/dispatch/_actor_dispatcher.py
property
¤is_running: bool
Return whether this background service is running.
A service is considered running when at least one task is running.
RETURNS | DESCRIPTION |
---|---|
bool
|
Whether this background service is running. |
property
¤name: str
The name of this background service.
RETURNS | DESCRIPTION |
---|---|
str
|
The name of this background service. |
property
¤Return the set of running tasks spawned by this background service.
Users typically should not modify the tasks in the returned set and only use them for informational purposes.
Danger
Changing the returned tasks may lead to unexpected behavior, don't do it unless the class explicitly documents it is safe to do so.
RETURNS | DESCRIPTION |
---|---|
Set[Task[Any]]
|
The set of running tasks spawned by this background service. |
async
¤__aenter__() -> Self
Enter an async context.
Start this background service.
RETURNS | DESCRIPTION |
---|---|
Self
|
This background service. |
async
¤__aexit__(
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None,
) -> None
Exit an async context.
Stop this background service.
PARAMETER | DESCRIPTION |
---|---|
exc_type
|
The type of the exception raised, if any.
TYPE:
|
exc_val
|
The exception raised, if any.
TYPE:
|
exc_tb
|
The traceback of the exception raised, if any.
TYPE:
|
Source code in frequenz/sdk/actor/_background_service.py
__await__() -> Generator[None, None, None]
Await this background service.
An awaited background service will wait for all its tasks to finish.
RETURNS | DESCRIPTION |
---|---|
None
|
An implementation-specific generator for the awaitable. |
Source code in frequenz/sdk/actor/_background_service.py
Destroy this instance.
Cancel all running tasks spawned by this background service.
__init__(retry_interval: timedelta) -> None
Initialize the retry manager.
PARAMETER | DESCRIPTION |
---|---|
retry_interval
|
The interval between retries.
TYPE:
|
Source code in frequenz/dispatch/_actor_dispatcher.py
__repr__() -> str
Return a string representation of this instance.
RETURNS | DESCRIPTION |
---|---|
str
|
A string representation of this instance. |
__str__() -> str
Return a string representation of this instance.
RETURNS | DESCRIPTION |
---|---|
str
|
A string representation of this instance. |
cancel(msg: str | None = None) -> None
Cancel all running tasks spawned by this background service.
PARAMETER | DESCRIPTION |
---|---|
msg
|
The message to be passed to the tasks being cancelled.
TYPE:
|
Source code in frequenz/sdk/actor/_background_service.py
retry(dispatch: Dispatch) -> None
Retry a dispatch.
PARAMETER | DESCRIPTION |
---|---|
dispatch
|
The dispatch information to retry.
TYPE:
|
Source code in frequenz/dispatch/_actor_dispatcher.py
async
¤stop(msg: str | None = None) -> None
Stop this background service.
This method cancels all running tasks spawned by this service and waits for them to finish.
PARAMETER | DESCRIPTION |
---|---|
msg
|
The message to be passed to the tasks being cancelled.
TYPE:
|
RAISES | DESCRIPTION |
---|---|
BaseExceptionGroup
|
If any of the tasks spawned by this service raised an exception. |
Source code in frequenz/sdk/actor/_background_service.py
async
¤Wait this background service to finish.
Wait until all background service tasks are finished.
RAISES | DESCRIPTION |
---|---|
BaseExceptionGroup
|
If any of the tasks spawned by this service raised an
exception ( |
Source code in frequenz/sdk/actor/_background_service.py
Functions¤
__aenter__
async
¤
__aenter__() -> Self
Enter an async context.
Start this background service.
RETURNS | DESCRIPTION |
---|---|
Self
|
This background service. |
__aexit__
async
¤
__aexit__(
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None,
) -> None
Exit an async context.
Stop this background service.
PARAMETER | DESCRIPTION |
---|---|
exc_type
|
The type of the exception raised, if any.
TYPE:
|
exc_val
|
The exception raised, if any.
TYPE:
|
exc_tb
|
The traceback of the exception raised, if any.
TYPE:
|
Source code in frequenz/sdk/actor/_background_service.py
__await__ ¤
__await__() -> Generator[None, None, None]
Await this background service.
An awaited background service will wait for all its tasks to finish.
RETURNS | DESCRIPTION |
---|---|
None
|
An implementation-specific generator for the awaitable. |
Source code in frequenz/sdk/actor/_background_service.py
__del__ ¤
Destroy this instance.
Cancel all running tasks spawned by this background service.
__init__ ¤
__init__(
actor_factory: Callable[
[DispatchInfo, Receiver[DispatchInfo]],
Awaitable[Actor],
],
running_status_receiver: Receiver[Dispatch],
dispatch_identity: (
Callable[[Dispatch], int] | None
) = None,
retry_interval: timedelta = timedelta(seconds=60),
) -> None
Initialize the dispatch handler.
PARAMETER | DESCRIPTION |
---|---|
actor_factory
|
A callable that creates an actor with some initial dispatch information.
TYPE:
|
running_status_receiver
|
The receiver for dispatch running status changes. |
dispatch_identity
|
A function to identify to which actor a dispatch refers. By default, it uses the dispatch ID. |
retry_interval
|
The interval between retries. |
Source code in frequenz/dispatch/_actor_dispatcher.py
__repr__ ¤
__repr__() -> str
Return a string representation of this instance.
RETURNS | DESCRIPTION |
---|---|
str
|
A string representation of this instance. |
__str__ ¤
__str__() -> str
Return a string representation of this instance.
RETURNS | DESCRIPTION |
---|---|
str
|
A string representation of this instance. |
cancel ¤
cancel(msg: str | None = None) -> None
Cancel all running tasks spawned by this background service.
PARAMETER | DESCRIPTION |
---|---|
msg
|
The message to be passed to the tasks being cancelled.
TYPE:
|
Source code in frequenz/sdk/actor/_background_service.py
start ¤
stop
async
¤
stop(msg: str | None = None) -> None
Stop this background service.
This method cancels all running tasks spawned by this service and waits for them to finish.
PARAMETER | DESCRIPTION |
---|---|
msg
|
The message to be passed to the tasks being cancelled.
TYPE:
|
RAISES | DESCRIPTION |
---|---|
BaseExceptionGroup
|
If any of the tasks spawned by this service raised an exception. |
Source code in frequenz/sdk/actor/_background_service.py
wait
async
¤
Wait this background service to finish.
Wait until all background service tasks are finished.
RAISES | DESCRIPTION |
---|---|
BaseExceptionGroup
|
If any of the tasks spawned by this service raised an
exception ( |
Source code in frequenz/sdk/actor/_background_service.py
frequenz.dispatch.Created
dataclass
¤
A dispatch created event.
Source code in frequenz/dispatch/_event.py
frequenz.dispatch.Deleted
dataclass
¤
A dispatch deleted event.
Source code in frequenz/dispatch/_event.py
frequenz.dispatch.Dispatch
dataclass
¤
Bases: Dispatch
Dispatch type with extra functionality.
Source code in frequenz/dispatch/_dispatch.py
Attributes¤
active
instance-attribute
¤
active: bool
Indicates whether the dispatch is active and eligible for processing.
create_time
instance-attribute
¤
create_time: datetime
The creation time of the dispatch in UTC. Set when a dispatch is created.
dry_run
instance-attribute
¤
dry_run: bool
Indicates if the dispatch is a dry run.
Executed for logging and monitoring without affecting actual component states.
duration
instance-attribute
¤
duration: timedelta | None
The duration of the dispatch, represented as a timedelta.
next_run
property
¤
next_run: datetime | None
Calculate the next run of a dispatch.
RETURNS | DESCRIPTION |
---|---|
datetime | None
|
The next run of the dispatch or None if the dispatch is finished. |
payload
instance-attribute
¤
The dispatch payload containing arbitrary data.
It is structured as needed for the dispatch operation.
recurrence
instance-attribute
¤
The recurrence rule for the dispatch.
Defining any repeating patterns or schedules.
started
property
¤
started: bool
Check if the dispatch is started.
RETURNS | DESCRIPTION |
---|---|
bool
|
True if the dispatch is started, False otherwise. |
type
instance-attribute
¤
type: str
User-defined information about the type of dispatch.
This is understood and processed by downstream applications.
until
property
¤
until: datetime | None
Time when the dispatch should end.
Returns the time that a running dispatch should end. If the dispatch is not running, None is returned.
RETURNS | DESCRIPTION |
---|---|
datetime | None
|
The time when the dispatch should end or None if the dispatch is not running. |
update_time
instance-attribute
¤
update_time: datetime
The last update time of the dispatch in UTC. Set when a dispatch is modified.
Functions¤
__init__ ¤
Initialize the dispatch.
PARAMETER | DESCRIPTION |
---|---|
client_dispatch
|
The client dispatch.
TYPE:
|
deleted
|
Whether the dispatch is deleted.
TYPE:
|
Source code in frequenz/dispatch/_dispatch.py
from_protobuf
classmethod
¤
from_protobuf(pb_object: Dispatch) -> Dispatch
Convert a protobuf dispatch to a dispatch.
PARAMETER | DESCRIPTION |
---|---|
pb_object
|
The protobuf dispatch to convert.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
Dispatch
|
The converted dispatch. |
Source code in frequenz/client/dispatch/types.py
missed_runs ¤
Yield all missed runs of a dispatch.
Yields all missed runs of a dispatch.
PARAMETER | DESCRIPTION |
---|---|
since
|
The time to start checking for missed runs.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
Iterator[datetime]
|
A generator that yields all missed runs of a dispatch. |
YIELDS | DESCRIPTION |
---|---|
datetime
|
The missed run.
TYPE::
|
Source code in frequenz/dispatch/_dispatch.py
next_run_after ¤
Calculate the next run of a dispatch.
PARAMETER | DESCRIPTION |
---|---|
after
|
The time to calculate the next run from.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
datetime | None
|
The next run of the dispatch or None if the dispatch is finished. |
Source code in frequenz/client/dispatch/types.py
to_protobuf ¤
Convert a dispatch to a protobuf dispatch.
RETURNS | DESCRIPTION |
---|---|
Dispatch
|
The converted protobuf dispatch. |
Source code in frequenz/client/dispatch/types.py
frequenz.dispatch.DispatchInfo
dataclass
¤
Event emitted when the dispatch changes.
Source code in frequenz/dispatch/_actor_dispatcher.py
frequenz.dispatch.Dispatcher ¤
Bases: BackgroundService
A highlevel interface for the dispatch API.
This class provides a highlevel interface to the dispatch API. It provides receivers for various events and management of actors based on dispatches.
The receivers shortly explained:
- Lifecycle events receiver: Receives an event whenever a dispatch is created, updated or deleted.
-
Running status change receiver: Receives an event whenever the running status of a dispatch changes. The running status of a dispatch can change due to a variety of reasons, such as but not limited to the dispatch being started, stopped, modified or deleted or reaching its scheduled start or end time.
Any change that could potentially require the consumer to start, stop or reconfigure itself will cause a message to be sent.
Managing an actor
import os
from frequenz.dispatch import Dispatcher, MergeByType
from unittest.mock import MagicMock
async def create_actor(dispatch: DispatchInfo, receiver: Receiver[DispatchInfo]) -> Actor:
return MagicMock(dispatch=dispatch, receiver=receiver)
async def run():
url = os.getenv("DISPATCH_API_URL", "grpc://fz-0004.frequenz.io:50051")
key = os.getenv("DISPATCH_API_KEY", "some-key")
microgrid_id = 1
async with Dispatcher(
microgrid_id=microgrid_id,
server_url=url,
key=key
) as dispatcher:
dispatcher.start_managing(
dispatch_type="DISPATCH_TYPE",
actor_factory=create_actor,
merge_strategy=MergeByType(),
)
await dispatcher
Processing running state change dispatches
import os
from frequenz.dispatch import Dispatcher
from unittest.mock import MagicMock
async def run():
url = os.getenv("DISPATCH_API_URL", "grpc://fz-0004.frequenz.io:50051")
key = os.getenv("DISPATCH_API_KEY", "some-key")
microgrid_id = 1
async with Dispatcher(
microgrid_id=microgrid_id,
server_url=url,
key=key
) as dispatcher:
actor = MagicMock() # replace with your actor
rs_receiver = dispatcher.new_running_state_event_receiver("DISPATCH_TYPE")
async for dispatch in rs_receiver:
if dispatch.started:
print(f"Executing dispatch {dispatch.id}, due on {dispatch.start_time}")
if actor.is_running:
actor.reconfigure(
components=dispatch.target,
run_parameters=dispatch.payload, # custom actor parameters
dry_run=dispatch.dry_run,
until=dispatch.until,
) # this will reconfigure the actor
else:
# this will start a new actor with the given components
# and run it for the duration of the dispatch
actor.start(
components=dispatch.target,
run_parameters=dispatch.payload, # custom actor parameters
dry_run=dispatch.dry_run,
until=dispatch.until,
)
else:
actor.stop() # this will stop the actor
Getting notification about dispatch lifecycle events
import os
from typing import assert_never
from frequenz.dispatch import Created, Deleted, Dispatcher, Updated
async def run():
url = os.getenv("DISPATCH_API_URL", "grpc://fz-0004.frequenz.io:50051")
key = os.getenv("DISPATCH_API_KEY", "some-key")
microgrid_id = 1
async with Dispatcher(
microgrid_id=microgrid_id,
server_url=url,
key=key,
) as dispatcher:
events_receiver = dispatcher.new_lifecycle_events_receiver("DISPATCH_TYPE")
async for event in events_receiver:
match event:
case Created(dispatch):
print(f"A dispatch was created: {dispatch}")
case Deleted(dispatch):
print(f"A dispatch was deleted: {dispatch}")
case Updated(dispatch):
print(f"A dispatch was updated: {dispatch}")
case _ as unhandled:
assert_never(unhandled)
Creating a new dispatch and then modifying it.
Note that this uses the lower-level Client
class to create and update the dispatch.
import os
from datetime import datetime, timedelta, timezone
from frequenz.client.common.microgrid.components import ComponentCategory
from frequenz.dispatch import Dispatcher
async def run():
url = os.getenv("DISPATCH_API_URL", "grpc://fz-0004.frequenz.io:50051")
key = os.getenv("DISPATCH_API_KEY", "some-key")
microgrid_id = 1
async with Dispatcher(
microgrid_id=microgrid_id,
server_url=url,
key=key,
) as dispatcher:
# Create a new dispatch
new_dispatch = await dispatcher.client.create(
microgrid_id=microgrid_id,
type="ECHO_FREQUENCY", # replace with your own type
start_time=datetime.now(tz=timezone.utc) + timedelta(minutes=10),
duration=timedelta(minutes=5),
target=ComponentCategory.INVERTER,
payload={"font": "Times New Roman"}, # Arbitrary payload data
)
# Modify the dispatch
await dispatcher.client.update(
microgrid_id=microgrid_id,
dispatch_id=new_dispatch.id,
new_fields={"duration": timedelta(minutes=10)}
)
# Validate the modification
modified_dispatch = await dispatcher.client.get(
microgrid_id=microgrid_id, dispatch_id=new_dispatch.id
)
assert modified_dispatch.duration == timedelta(minutes=10)
Source code in frequenz/dispatch/_dispatcher.py
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 |
|
Attributes¤
name
property
¤
name: str
The name of this background service.
RETURNS | DESCRIPTION |
---|---|
str
|
The name of this background service. |
tasks
property
¤
Return the set of running tasks spawned by this background service.
Users typically should not modify the tasks in the returned set and only use them for informational purposes.
Danger
Changing the returned tasks may lead to unexpected behavior, don't do it unless the class explicitly documents it is safe to do so.
RETURNS | DESCRIPTION |
---|---|
Set[Task[Any]]
|
The set of running tasks spawned by this background service. |
Functions¤
__aenter__
async
¤
__aenter__() -> Self
Enter an async context.
Start this background service.
RETURNS | DESCRIPTION |
---|---|
Self
|
This background service. |
Source code in frequenz/dispatch/_dispatcher.py
__aexit__
async
¤
__aexit__(
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None,
) -> None
Exit an async context.
Stop this background service.
PARAMETER | DESCRIPTION |
---|---|
exc_type
|
The type of the exception raised, if any.
TYPE:
|
exc_val
|
The exception raised, if any.
TYPE:
|
exc_tb
|
The traceback of the exception raised, if any.
TYPE:
|
Source code in frequenz/sdk/actor/_background_service.py
__await__ ¤
__await__() -> Generator[None, None, None]
Await this background service.
An awaited background service will wait for all its tasks to finish.
RETURNS | DESCRIPTION |
---|---|
None
|
An implementation-specific generator for the awaitable. |
Source code in frequenz/sdk/actor/_background_service.py
__del__ ¤
Destroy this instance.
Cancel all running tasks spawned by this background service.
__init__ ¤
Initialize the dispatcher.
PARAMETER | DESCRIPTION |
---|---|
microgrid_id
|
The microgrid id.
TYPE:
|
server_url
|
The URL of the dispatch service.
TYPE:
|
key
|
The key to access the service.
TYPE:
|
Source code in frequenz/dispatch/_dispatcher.py
__repr__ ¤
__repr__() -> str
Return a string representation of this instance.
RETURNS | DESCRIPTION |
---|---|
str
|
A string representation of this instance. |
__str__ ¤
__str__() -> str
Return a string representation of this instance.
RETURNS | DESCRIPTION |
---|---|
str
|
A string representation of this instance. |
is_managed ¤
Check if the dispatcher is managing actors for a given dispatch type.
PARAMETER | DESCRIPTION |
---|---|
dispatch_type
|
The type of the dispatch to check.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
bool
|
True if the dispatcher is managing actors for the given dispatch type. |
Source code in frequenz/dispatch/_dispatcher.py
new_lifecycle_events_receiver ¤
new_lifecycle_events_receiver(
dispatch_type: str,
) -> Receiver[DispatchEvent]
Return new, updated or deleted dispatches receiver.
PARAMETER | DESCRIPTION |
---|---|
dispatch_type
|
The type of the dispatch to listen for.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
Receiver[DispatchEvent]
|
A new receiver for new dispatches. |
Source code in frequenz/dispatch/_dispatcher.py
new_running_state_event_receiver
async
¤
new_running_state_event_receiver(
dispatch_type: str,
*,
merge_strategy: MergeStrategy | None = None
) -> Receiver[Dispatch]
Return running state event receiver.
This receiver will receive a message whenever the current running status of a dispatch changes.
Usually, one message per scheduled run is to be expected. However, things get complicated when a dispatch was modified:
If it was currently running and the modification now says it should not be running or running with different parameters, then a message will be sent.
In other words: Any change that is expected to make an actor start, stop or adjust itself according to new dispatch options causes a message to be sent.
merge_strategy
is an instance of a class derived from
MergeStrategy
Available strategies
are:
MergeByType
— merges all dispatches of the same typeMergeByTypeTarget
— merges all dispatches of the same type and targetNone
— no merging, just send all events (default)
Running intervals from multiple dispatches will be merged, according to the chosen strategy.
While merging, stop events are ignored as long as at least one merge-criteria-matching dispatch remains active.
PARAMETER | DESCRIPTION |
---|---|
dispatch_type
|
The type of the dispatch to listen for.
TYPE:
|
merge_strategy
|
The type of the strategy to merge running intervals.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
Receiver[Dispatch]
|
A new receiver for dispatches whose running status changed. |
Source code in frequenz/dispatch/_dispatcher.py
start ¤
start_managing
async
¤
start_managing(
dispatch_type: str,
*,
actor_factory: Callable[
[DispatchInfo, Receiver[DispatchInfo]],
Awaitable[Actor],
],
merge_strategy: MergeStrategy | None = None,
retry_interval: timedelta = timedelta(seconds=60)
) -> None
Manage actors for a given dispatch type.
Creates and manages an
ActorDispatcher
for the given type that will
start, stop and reconfigure actors based on received dispatches.
You can await the Dispatcher
instance to block until all types
registered with start_managing()
are stopped using
stop_managing()
"Merging" means that when multiple dispatches are active at the same time, the intervals are merged into one.
This also decides how instances are mapped from dispatches to actors:
MergeByType
— All dispatches map to one single instance identified by the dispatch type.MergeByTypeTarget
— A dispatch maps to an instance identified by the dispatch type and target. So different dispatches with equal type and target will map to the same instance.None
— No merging, each dispatch maps to a separate instance.
PARAMETER | DESCRIPTION |
---|---|
dispatch_type
|
The type of the dispatch to manage.
TYPE:
|
actor_factory
|
The factory to create actors.
TYPE:
|
merge_strategy
|
The strategy to merge running intervals.
TYPE:
|
retry_interval
|
Retry interval for when actor creation fails. |
Source code in frequenz/dispatch/_dispatcher.py
stop
async
¤
stop(msg: str | None = None) -> None
Stop this background service.
This method cancels all running tasks spawned by this service and waits for them to finish.
PARAMETER | DESCRIPTION |
---|---|
msg
|
The message to be passed to the tasks being cancelled.
TYPE:
|
RAISES | DESCRIPTION |
---|---|
BaseExceptionGroup
|
If any of the tasks spawned by this service raised an exception. |
Source code in frequenz/sdk/actor/_background_service.py
stop_managing
async
¤
stop_managing(dispatch_type: str) -> None
Stop managing actors for a given dispatch type.
PARAMETER | DESCRIPTION |
---|---|
dispatch_type
|
The type of the dispatch to stop managing.
TYPE:
|
Source code in frequenz/dispatch/_dispatcher.py
wait
async
¤
Wait until all actor dispatches are stopped.
wait_for_initialization
async
¤
frequenz.dispatch.MergeByType ¤
Bases: MergeStrategy
Merge running intervals based on the dispatch type.
Source code in frequenz/dispatch/_merge_strategies.py
Functions¤
filter ¤
Filter dispatches based on the merge strategy.
Keeps start events. Keeps stop events only if no other dispatches matching the strategy's criteria are running.
Source code in frequenz/dispatch/_merge_strategies.py
frequenz.dispatch.MergeByTypeTarget ¤
Bases: MergeByType
Merge running intervals based on the dispatch type and target.
Source code in frequenz/dispatch/_merge_strategies.py
Functions¤
filter ¤
Filter dispatches based on the merge strategy.
Keeps start events. Keeps stop events only if no other dispatches matching the strategy's criteria are running.
Source code in frequenz/dispatch/_merge_strategies.py
identity ¤
frequenz.dispatch.MergeStrategy ¤
Bases: ABC
Base class for strategies to merge running intervals.
Source code in frequenz/dispatch/_bg_service.py
Functions¤
filter
abstractmethod
¤
Filter dispatches based on the strategy.
PARAMETER | DESCRIPTION |
---|---|
dispatches
|
All dispatches, available as context. |
dispatch
|
The dispatch to filter.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
bool
|
True if the dispatch should be included, False otherwise. |
Source code in frequenz/dispatch/_bg_service.py
frequenz.dispatch.Updated
dataclass
¤
A dispatch updated event.