actor
frequenz.sdk.actor ¤
Actors are a primitive unit of computation that runs autonomously.
Actor Programming Model¤
From Wikipedia
The actor model in computer science is a mathematical model of concurrent computation that treats an actor as the basic building block of concurrent computation. In response to a message it receives, an actor can: make local decisions, create more actors, send more messages, and determine how to respond to the next message received. Actors may modify their own private state, but can only affect each other indirectly through messaging (removing the need for lock-based synchronization).
We won't get into much more detail here because it is outside the scope of this documentation. However, if you are interested in learning more about the actor programming model, here are some useful resources:
Frequenz SDK Actors¤
The Actor
class serves as the foundation for
creating concurrent tasks and all actors in the SDK inherit from it. This class
provides a straightforward way to implement actors. It shares similarities with
the traditional actor programming model but also has some unique features:
-
Message Passing: Like traditional actors, our
Actor
class communicates through message passing. Even when no particular message passing mechanism is enforced, the SDK actors usefrequenz.channels
for communication. -
Automatic Restart: If an unhandled exception occurs in an actor's logic (
_run()
method), the actor will be automatically restarted. This ensures robustness in the face of errors. -
Simplified Task Management: Actors manage asynchronous tasks using
asyncio
. You can create and manage tasks within the actor, and theActor
class handles task cancellation and cleanup. -
Simplified lifecycle management: Actors are [async context managers] and also a
run()
function is provided to easily run a group of actors and wait for them to finish.
Lifecycle¤
Actors are not started when they are created. There are 3 main ways to start an actor (from most to least recommended):
- By using the
run()
function. - By using the actor as an async context manager.
- By using the
start()
method.
Warning
-
If an actor raises an unhandled exception, it will be restarted automatically.
-
Actors manage
asyncio.Task
objects, so a reference to the actor object must be held for as long as the actor is expected to be running, otherwise its tasks will be cancelled and the actor will stop. For more information, please refer to the Pythonasyncio
documentation.
The run()
Function¤
The run()
function can start many actors at once and waits
for them to finish. If any of the actors is stopped with errors, the errors will be
logged.
Example
import asyncio
from frequenz.sdk.actor import Actor, run
class MyActor(Actor):
async def _run(self) -> None:
while True:
print("Hello World!")
await asyncio.sleep(1)
await asyncio.run(MyActor()) # (1)!
- This line will block until the actor completes its execution or is manually stopped.
Async Context Manager¤
When an actor is used as an async context manager, it is started when the
async with
block is entered and stopped automatically when the block is exited
(even if an unhandled exception is raised).
Example
The start()
Method¤
When using the start()
method, the actor is
started in the background and the method returns immediately. The actor will
continue to run until it is manually stopped or until it completes its execution.
Example
from frequenz.sdk.actor import Actor
class MyActor(Actor):
async def _run(self) -> None:
print("Hello World!")
actor = MyActor() # (1)!
actor.start() # (2)!
print("The actor is running") # (3)!
await actor.stop() # (4)!
- The actor is created but not started yet.
- The actor is started manually, it keeps running in the background.
-
Danger
If this function would raise an exception, the actor will never be stopped!
-
Until the actor is stopped manually.
Warning
This method is not recommended because it is easy to forget to stop the actor manually, especially in error conditions.
Communication¤
The Actor
class doesn't impose any specific way to
communicate between actors. However, frequenz.channels
are always used as the
communication mechanism between actors in the SDK.
Implementing an Actor¤
To implement an actor, you must inherit from the Actor
class and implement an initializer and the abstract _run()
method.
Initialization¤
The initializer is called when the actor is created. The
Actor
class initializer
(__init__
) should be always called first in the
class we are implementing to make sure actors are properly initialized.
The Actor.__init__()
takes one optional argument,
a name
that will be used to identify the actor in
logs. If no name is provided, a default name will be generated, but it is recommended
that Actor
subclasses can also receive a name as an
argument to make it easier to identify individual instances in logs more easily.
The actor initializer normally also accepts as arguments the input channels receivers and output channels senders that will be used for communication. These channels should be created outside the actor and passed to it as arguments to ensure actors can be composed easily.
Example echo actor
from frequenz.channels import Receiver, Sender
from frequenz.sdk.actor import Actor
class EchoActor(Actor): # (1)!
def __init__(
self,
receiver: Receiver[int], # (2)!
output: Sender[int], # (3)!
name: str | None = None, # (4)!
) -> None:
super().__init__(name=name) # (5)!
self._input: Receiver[int] = receiver # (6)!
self._output: Sender[int] = output # (7)!
-
We define a new actor class called
EchoActor
that inherits fromActor
. -
We accept an
receiver
argument that will be used to receive messages from a channel. - We accept an
output
argument that will be used to send messages to a channel. - We accept an optional
name
argument that will be used to identify the actor in logs. - We call
Actor.__init__()
to make sure the actor is properly initialized. - We store the
receiver
argument in a private attribute to use it later. - We store the
output
argument in a private attribute to use it later.
The _run()
Method¤
The abstract _run()
method is called automatically by the base class when the actor is
started.
Normally an actor should run forever (or until it is
stopped), so it is very common to have an infinite loop
in the _run()
method, typically receiving messages from one or more channels
(receivers), processing them and sending the results to
other channels (senders).
However, it is worth noting that an actor can also be designed for a one-time execution
or a limited number of runs, terminating upon completion.
Example echo actor
from frequenz.channels import Receiver, Sender
from frequenz.sdk.actor import Actor
class EchoActor(Actor):
def __init__(
self,
receiver: Receiver[int],
output: Sender[int],
name: str | None = None,
) -> None:
super().__init__(name=name)
self._input: Receiver[int] = receiver
self._output: Sender[int] = output
async def _run(self) -> None: # (1)!
async for msg in self._input: # (2)!
await self._output.send(msg) # (3)!
- We implement the abstract
_run()
method. - We receive messages from the
receiver
one by one. - We send the received message to the
output
channel.
Stopping¤
By default, the stop()
method will call the
cancel()
method (which will cancel all the tasks
created by the actor) and will wait for them to finish.
This means that when an actor is stopped, the _run()
method will receive
a CancelledError
exception. You should have this in mind
when implementing your actor and make sure to handle this exception properly if you need
to do any cleanup.
The actor will handle the CancelledError
exception
automatically if it is not handled in the _run()
method, so if there is no
need for extra cleanup, you don't need to worry about it.
If an unhandled exception is raised in the _run()
method, the actor will
re-run the _run()
method automatically. This ensures robustness in the face of
errors, but you should also have this in mind if you need to do any cleanup to make sure
the re-run doesn't cause any problems.
Tip
You could implement your own stop()
method to,
for example, send a message to a channel to notify that the actor should stop, or
any other more graceful way to stop the actor if you need to make sure it can't be
interrupted at any await
point.
Spawning Extra Tasks¤
Actors run at least one background task, created automatically by the
Actor
class. But Actor
inherits from BackgroundService
, which
provides a few methods to create and manage extra tasks.
If your actor needs to spawn extra tasks, you can use
BackgroundService
facilities to manage the
tasks, so they are also automatically stopped when the actor is stopped.
All you need to do is add the newly spawned tasks to the actor's
tasks
set.
Example
import asyncio
from frequenz.sdk.actor import Actor
class MyActor(Actor):
async def _run(self) -> None:
extra_task = asyncio.create_task(self._extra_task()) # (1)!
self.tasks.add(extra_task) # (2)!
while True: # (3)!
print("_run() running")
await asyncio.sleep(1)
async def _extra_task(self) -> None:
while True: # (4)!
print("_extra_task() running")
await asyncio.sleep(1.1)
async with MyActor() as actor: # (5)!
await asyncio.sleep(3) # (6)!
# (7)!
- We create a new task using
asyncio.create_task()
. - We add the task to the actor's
tasks
set. This ensures the task will be cancelled and cleaned up when the actor is stopped. - We leave the actor running forever.
- The extra task will also run forever.
- The actor is started.
- We wait for 3 seconds, the actor should print a bunch of "_run() running" and "_extra_task() running" messages while it's running.
- The actor is stopped and the extra task is cancelled automatically.
Examples¤
Here are a few simple but complete examples to demonstrate how to create actors and connect them using channels.
Tip
The code examples are annotated with markers (like ), you can click on them to see the step-by-step explanation of what's going on. The annotations are numbered according to the order of execution.
Composing actors¤
This example shows how to create two actors and connect them using broadcast channels.
import asyncio
from frequenz.channels import Broadcast, Receiver, Sender
from frequenz.sdk.actor import Actor
class Actor1(Actor): # (1)!
def __init__(
self,
receiver: Receiver[str],
output: Sender[str],
name: str | None = None,
) -> None:
super().__init__(name=name)
self._receiver = receiver
self._output = output
async def _run(self) -> None:
async for msg in self._receiver:
await self._output.send(f"Actor1 forwarding: {msg!r}") # (8)!
class Actor2(Actor):
def __init__(
self,
receiver: Receiver[str],
output: Sender[str],
name: str | None = None,
) -> None:
super().__init__(name=name)
self._receiver = receiver
self._output = output
async def _run(self) -> None:
async for msg in self._receiver:
await self._output.send(f"Actor2 forwarding: {msg!r}") # (9)!
async def main() -> None: # (2)!
# (4)!
input_channel: Broadcast[str] = Broadcast("Input to Actor1")
middle_channel: Broadcast[str] = Broadcast("Actor1 -> Actor2 stream")
output_channel: Broadcast[str] = Broadcast("Actor2 output")
input_sender = input_channel.new_sender()
output_receiver = output_channel.new_receiver()
async with ( # (5)!
Actor1(input_channel.new_receiver(), middle_channel.new_sender(), "actor1"),
Actor2(middle_channel.new_receiver(), output_channel.new_sender(), "actor2"),
):
await input_sender.send("Hello") # (6)!
msg = await output_receiver.receive() # (7)!
print(msg) # (10)!
# (11)!
if __name__ == "__main__": # (3)!
asyncio.run(main())
-
We define 2 actors:
Actor1
andActor2
that will just forward a message from an input channel to an output channel, adding some text. -
We define an async
main()
function within the main logic of our asyncio program. -
We start the
main()
function in the async loop usingasyncio.run()
. -
We create a bunch of broadcast channels to connect our actors.
input_channel
is the input channel forActor1
.middle_channel
is the channel that connectsActor1
andActor2
.output_channel
is the output channel forActor2
.
-
We create two actors and use them as async context managers,
Actor1
andActor2
, and connect them by creating new senders and receivers from the channels.Note
We don't use the
run()
function here because we want to stop the actors when we are done with them, but the actors will run forever (as long as the channel is not closed). So the async context manager is a better fit for this example. -
We schedule the sending of the message
Hello
toActor1
viainput_channel
. -
We receive (await) the response from
Actor2
viaoutput_channel
. Between this and the previous steps theasync
calls in the actors will be executed. -
Actor1
sends the re-formatted message (Actor1 forwarding: Hello
) toActor2
via themiddle_channel
. -
Actor2
sends the re-formatted message (Actor2 forwarding: "Actor1 forwarding: 'Hello'"
) to theoutput_channel
. -
Finally, we print the received message, which will still be
Actor2 forwarding: "Actor1 forwarding: 'Hello'"
. -
The actors are stopped and cleaned up automatically when the
async with
block ends.
The expected output is:
Receiving from multiple channels¤
This example shows how to create an actor that receives messages from multiple
broadcast channels using
select()
.
import asyncio
from frequenz.channels import Broadcast, Receiver, Sender
from frequenz.channels.util import select, selected_from
from frequenz.sdk.actor import Actor, run
class EchoActor(Actor): # (1)!
def __init__(
self,
receiver_1: Receiver[bool],
receiver_2: Receiver[bool],
output: Sender[bool],
name: str | None = None,
) -> None:
super().__init__(name=name)
self._receiver_1 = receiver_1
self._receiver_2 = receiver_2
self._output = output
async def _run(self) -> None: # (2)!
async for selected in select(self._receiver_1, self._receiver_2): # (10)!
if selected_from(selected, self._receiver_1): # (11)!
print(f"Received from receiver_1: {selected.value}")
await self._output.send(selected.value)
if not selected.value: # (12)!
break
elif selected_from(selected, self._receiver_2): # (13)!
print(f"Received from receiver_2: {selected.value}")
await self._output.send(selected.value)
if not selected.value: # (14)!
break
else:
assert False, "Unknown selected channel"
print("EchoActor finished")
# (15)!
# (3)!
input_channel_1 = Broadcast[bool]("input_channel_1")
input_channel_2 = Broadcast[bool]("input_channel_2")
echo_channel = Broadcast[bool]("echo_channel")
echo_actor = EchoActor( # (4)!
input_channel_1.new_receiver(),
input_channel_2.new_receiver(),
echo_channel.new_sender(),
"echo-actor",
)
echo_receiver = echo_channel.new_receiver() # (5)!
async def main() -> None: # (6)!
# (8)!
await input_channel_1.new_sender().send(True)
await input_channel_2.new_sender().send(False)
await run(echo_actor) # (9)!
await echo_channel.close() # (16)!
async for message in echo_receiver: # (17)!
print(f"Received {message=}")
if __name__ == "__main__": # (7)!
asyncio.run(main())
-
We define an
EchoActor
that receives messages from two channels and sends them to another channel. -
We implement the
_run()
method that will receive messages from the two channels usingselect()
and send them to the output channel. Therun()
method will stop if aFalse
message is received. -
We create the channels that will be used with the actor.
-
We create the actor and connect it to the channels by creating new receivers and senders from the channels.
-
We create a receiver for the
echo_channel
to eventually receive the messages sent by the actor. -
We define the
main()
function that will run the actor. -
We start the
main()
function in the async loop usingasyncio.run()
. -
We send a message to each of the input channels. These messages will be queued in the channels until they are consumed by the actor.
-
We start the actor and wait for it to finish using the
run()
function. -
The
select()
function will get the first message available from the two channels. The order in which they will be handled is unknown, but in this example we assume that the first message will be frominput_channel_1
(True
) and the second frominput_channel_1
(False
). -
The
selected_from()
function will returnTrue
for theinput_channel_1
receiver.selected.value
holds the received message, so"Received from receiver_1: True"
will be printed andTrue
will be sent to theoutput
channel. -
Since
selected.value
isTrue
, the loop will continue, going back to theselect()
function. -
The
selected_from()
function will returnFalse
for theinput_channel_1
receiver andTrue
for theinput_channel_2
receiver. The message stored inselected.value
will now beFalse
, so"Received from receiver_2: False"
will be printed andFalse
will be sent to theoutput
channel. -
Since
selected.value
isFalse
, the loop will break. -
The
_run()
method will finish normally and the actor will be stopped, so therun()
function will return. -
We close the
echo_channel
to make sure theecho_receiver
will stop receiving messages after all the queued messages are consumed (otherwise the step 17 will never end!). -
We receive the messages sent by the actor to the
echo_channel
one by one and print them, it should print firstReceived message=True
and thenReceived message=False
.
The expected output is:
Received from receiver_1: True
Received from receiver_2: False
Received message=True
Received message=False
Classes¤
frequenz.sdk.actor.Actor ¤
Bases: BackgroundService
, ABC
A primitive unit of computation that runs autonomously.
To implement an actor, subclasses must implement the
_run()
method, which should run the actor's
logic. The _run()
method is called by the
base class when the actor is started, and is expected to run until the actor is
stopped.
Info
Please read the actor
module documentation for more
comprehensive guide on how to use and implement actors properly.
Source code in frequenz/sdk/actor/_actor.py
16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 |
|
Attributes¤
RESTART_DELAY
class-attribute
instance-attribute
¤
The delay to wait between restarts of this actor.
is_running
property
¤
is_running: bool
Return whether this background service is running.
A service is considered running when at least one task is running.
RETURNS | DESCRIPTION |
---|---|
bool
|
Whether this background service is running. |
name
property
¤
name: str
The name of this background service.
RETURNS | DESCRIPTION |
---|---|
str
|
The name of this background service. |
tasks
property
¤
Return the set of running tasks spawned by this background service.
Users typically should not modify the tasks in the returned set and only use them for informational purposes.
Danger
Changing the returned tasks may lead to unexpected behavior, don't do it unless the class explicitly documents it is safe to do so.
RETURNS | DESCRIPTION |
---|---|
Set[Task[Any]]
|
The set of running tasks spawned by this background service. |
Functions¤
__aenter__
async
¤
__aenter__() -> Self
Enter an async context.
Start this background service.
RETURNS | DESCRIPTION |
---|---|
Self
|
This background service. |
__aexit__
async
¤
__aexit__(
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None,
) -> None
Exit an async context.
Stop this background service.
PARAMETER | DESCRIPTION |
---|---|
exc_type
|
The type of the exception raised, if any.
TYPE:
|
exc_val
|
The exception raised, if any.
TYPE:
|
exc_tb
|
The traceback of the exception raised, if any.
TYPE:
|
Source code in frequenz/sdk/actor/_background_service.py
__await__ ¤
__await__() -> Generator[None, None, None]
Await this background service.
An awaited background service will wait for all its tasks to finish.
RETURNS | DESCRIPTION |
---|---|
None
|
An implementation-specific generator for the awaitable. |
Source code in frequenz/sdk/actor/_background_service.py
__del__ ¤
Destroy this instance.
Cancel all running tasks spawned by this background service.
__init__ ¤
__init__(*, name: str | None = None) -> None
Initialize this BackgroundService.
PARAMETER | DESCRIPTION |
---|---|
name
|
The name of this background service. If
TYPE:
|
Source code in frequenz/sdk/actor/_background_service.py
__repr__ ¤
__repr__() -> str
Return a string representation of this instance.
RETURNS | DESCRIPTION |
---|---|
str
|
A string representation of this instance. |
__str__ ¤
__str__() -> str
Return a string representation of this instance.
RETURNS | DESCRIPTION |
---|---|
str
|
A string representation of this instance. |
cancel ¤
cancel(msg: str | None = None) -> None
Cancel all running tasks spawned by this background service.
PARAMETER | DESCRIPTION |
---|---|
msg
|
The message to be passed to the tasks being cancelled.
TYPE:
|
Source code in frequenz/sdk/actor/_background_service.py
start ¤
Start this actor.
If this actor is already running, this method does nothing.
stop
async
¤
stop(msg: str | None = None) -> None
Stop this background service.
This method cancels all running tasks spawned by this service and waits for them to finish.
PARAMETER | DESCRIPTION |
---|---|
msg
|
The message to be passed to the tasks being cancelled.
TYPE:
|
RAISES | DESCRIPTION |
---|---|
BaseExceptionGroup
|
If any of the tasks spawned by this service raised an exception. |
Source code in frequenz/sdk/actor/_background_service.py
wait
async
¤
Wait this background service to finish.
Wait until all background service tasks are finished.
RAISES | DESCRIPTION |
---|---|
BaseExceptionGroup
|
If any of the tasks spawned by this service raised an
exception ( |
Source code in frequenz/sdk/actor/_background_service.py
frequenz.sdk.actor.BackgroundService ¤
Bases: ABC
A background service that can be started and stopped.
A background service is a service that runs in the background spawning one or more tasks. The service can be started and stopped and can work as an async context manager to provide deterministic cleanup.
To implement a background service, subclasses must implement the
start()
method, which should
start the background tasks needed by the service, and add them to the _tasks
protected attribute.
If you need to collect results or handle exceptions of the tasks when stopping the
service, then you need to also override the
stop()
method, as the base
implementation does not collect any results and re-raises all exceptions.
Warning
As background services manage asyncio.Task
objects, a reference to them
must be held for as long as the background service is expected to be running,
otherwise its tasks will be cancelled and the service will stop. For more
information, please refer to the Python asyncio
documentation.
Example
import datetime
import asyncio
class Clock(BackgroundService):
def __init__(self, resolution_s: float, *, name: str | None = None) -> None:
super().__init__(name=name)
self._resolution_s = resolution_s
def start(self) -> None:
self._tasks.add(asyncio.create_task(self._tick()))
async def _tick(self) -> None:
while True:
await asyncio.sleep(self._resolution_s)
print(datetime.datetime.now())
async def main() -> None:
# As an async context manager
async with Clock(resolution_s=1):
await asyncio.sleep(5)
# Manual start/stop (only use if necessary, as cleanup is more complicated)
clock = Clock(resolution_s=1)
clock.start()
await asyncio.sleep(5)
await clock.stop()
asyncio.run(main())
Source code in frequenz/sdk/actor/_background_service.py
13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 |
|
Attributes¤
is_running
property
¤
is_running: bool
Return whether this background service is running.
A service is considered running when at least one task is running.
RETURNS | DESCRIPTION |
---|---|
bool
|
Whether this background service is running. |
name
property
¤
name: str
The name of this background service.
RETURNS | DESCRIPTION |
---|---|
str
|
The name of this background service. |
tasks
property
¤
Return the set of running tasks spawned by this background service.
Users typically should not modify the tasks in the returned set and only use them for informational purposes.
Danger
Changing the returned tasks may lead to unexpected behavior, don't do it unless the class explicitly documents it is safe to do so.
RETURNS | DESCRIPTION |
---|---|
Set[Task[Any]]
|
The set of running tasks spawned by this background service. |
Functions¤
__aenter__
async
¤
__aenter__() -> Self
Enter an async context.
Start this background service.
RETURNS | DESCRIPTION |
---|---|
Self
|
This background service. |
__aexit__
async
¤
__aexit__(
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None,
) -> None
Exit an async context.
Stop this background service.
PARAMETER | DESCRIPTION |
---|---|
exc_type
|
The type of the exception raised, if any.
TYPE:
|
exc_val
|
The exception raised, if any.
TYPE:
|
exc_tb
|
The traceback of the exception raised, if any.
TYPE:
|
Source code in frequenz/sdk/actor/_background_service.py
__await__ ¤
__await__() -> Generator[None, None, None]
Await this background service.
An awaited background service will wait for all its tasks to finish.
RETURNS | DESCRIPTION |
---|---|
None
|
An implementation-specific generator for the awaitable. |
Source code in frequenz/sdk/actor/_background_service.py
__del__ ¤
Destroy this instance.
Cancel all running tasks spawned by this background service.
__init__ ¤
__init__(*, name: str | None = None) -> None
Initialize this BackgroundService.
PARAMETER | DESCRIPTION |
---|---|
name
|
The name of this background service. If
TYPE:
|
Source code in frequenz/sdk/actor/_background_service.py
__repr__ ¤
__repr__() -> str
Return a string representation of this instance.
RETURNS | DESCRIPTION |
---|---|
str
|
A string representation of this instance. |
__str__ ¤
__str__() -> str
Return a string representation of this instance.
RETURNS | DESCRIPTION |
---|---|
str
|
A string representation of this instance. |
cancel ¤
cancel(msg: str | None = None) -> None
Cancel all running tasks spawned by this background service.
PARAMETER | DESCRIPTION |
---|---|
msg
|
The message to be passed to the tasks being cancelled.
TYPE:
|
Source code in frequenz/sdk/actor/_background_service.py
start
abstractmethod
¤
stop
async
¤
stop(msg: str | None = None) -> None
Stop this background service.
This method cancels all running tasks spawned by this service and waits for them to finish.
PARAMETER | DESCRIPTION |
---|---|
msg
|
The message to be passed to the tasks being cancelled.
TYPE:
|
RAISES | DESCRIPTION |
---|---|
BaseExceptionGroup
|
If any of the tasks spawned by this service raised an exception. |
Source code in frequenz/sdk/actor/_background_service.py
wait
async
¤
Wait this background service to finish.
Wait until all background service tasks are finished.
RAISES | DESCRIPTION |
---|---|
BaseExceptionGroup
|
If any of the tasks spawned by this service raised an
exception ( |
Source code in frequenz/sdk/actor/_background_service.py
frequenz.sdk.actor.ResamplerConfig
dataclass
¤
Resampler configuration.
Source code in frequenz/sdk/timeseries/_resampling.py
127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 |
|
Attributes¤
align_to
class-attribute
instance-attribute
¤
align_to: datetime | None = UNIX_EPOCH
The time to align the resampling period to.
The resampling period will be aligned to this time, so the first resampled
sample will be at the first multiple of resampling_period
starting from
align_to
. It must be an aware datetime and can be in the future too.
If align_to
is None
, the resampling period will be aligned to the
time the resampler is created.
initial_buffer_len
class-attribute
instance-attribute
¤
initial_buffer_len: int = DEFAULT_BUFFER_LEN_INIT
The initial length of the resampling buffer.
The buffer could grow or shrink depending on the source properties, like sampling rate, to make sure all the requested past sampling periods can be stored.
It must be at least 1 and at most max_buffer_len
.
max_buffer_len
class-attribute
instance-attribute
¤
max_buffer_len: int = DEFAULT_BUFFER_LEN_MAX
The maximum length of the resampling buffer.
Buffers won't be allowed to grow beyond this point even if it would be needed to keep all the requested past sampling periods. An error will be emitted in the logs if the buffer length needs to be truncated to this value.
It must be at bigger than warn_buffer_len
.
max_data_age_in_periods
class-attribute
instance-attribute
¤
max_data_age_in_periods: float = 3.0
The maximum age a sample can have to be considered relevant for resampling.
Expressed in number of periods, where period is the resampling_period
if we are downsampling (resampling period bigger than the input period) or
the input sampling period if we are upsampling (input period bigger than
the resampling period).
It must be bigger than 1.0.
Example
If resampling_period
is 3 seconds, the input sampling period is
1 and max_data_age_in_periods
is 2, then data older than 3*2
= 6 seconds will be discarded when creating a new sample and never
passed to the resampling function.
If resampling_period
is 3 seconds, the input sampling period is
5 and max_data_age_in_periods
is 2, then data older than 5*2
= 10 seconds will be discarded when creating a new sample and never
passed to the resampling function.
resampling_function
class-attribute
instance-attribute
¤
The resampling function.
This function will be applied to the sequence of relevant samples at a given time. The result of the function is what is sent as the resampled value.
resampling_period
instance-attribute
¤
resampling_period: timedelta
The resampling period.
This is the time it passes between resampled data should be calculated.
It must be a positive time span.
warn_buffer_len
class-attribute
instance-attribute
¤
warn_buffer_len: int = DEFAULT_BUFFER_LEN_WARN
The minimum length of the resampling buffer that will emit a warning.
If a buffer grows bigger than this value, it will emit a warning in the logs, so buffers don't grow too big inadvertently.
It must be at least 1 and at most max_buffer_len
.
Functions¤
__post_init__ ¤
Check that config values are valid.
RAISES | DESCRIPTION |
---|---|
ValueError
|
If any value is out of range. |
Source code in frequenz/sdk/timeseries/_resampling.py
Functions¤
frequenz.sdk.actor.run
async
¤
run(*actors: Actor) -> None
Await the completion of all actors.
Info
Please read the actor
module documentation for more
comprehensive guide on how to use and implement actors properly.
PARAMETER | DESCRIPTION |
---|---|
*actors
|
the actors to be awaited.
TYPE:
|