Actors¤
Actors are a primitive unit of computation that runs autonomously.
Actor Programming Model¤
From Wikipedia
The actor model in computer science is a mathematical model of concurrent computation that treats an actor as the basic building block of concurrent computation. In response to a message it receives, an actor can: make local decisions, create more actors, send more messages, and determine how to respond to the next message received. Actors may modify their own private state, but can only affect each other indirectly through messaging (removing the need for lock-based synchronization).
We won't get into much more detail here because it is outside the scope of this documentation. However, if you are interested in learning more about the actor programming model, here are some useful resources:
Frequenz SDK Actors¤
The Actor
class serves as the foundation for
creating concurrent tasks and all actors in the SDK inherit from it. This class
provides a straightforward way to implement actors. It shares similarities with
the traditional actor programming model but also has some unique features:
-
Message Passing: Like traditional actors, our
Actor
class communicates through message passing. Even when no particular message passing mechanism is enforced, the SDK actors usefrequenz.channels
for communication. -
Automatic Restart: If an unhandled exception occurs in an actor's logic (
_run()
method), the actor will be automatically restarted. This ensures robustness in the face of errors. -
Simplified Task Management: Actors manage asynchronous tasks using
asyncio
. You can create and manage tasks within the actor, and theActor
class handles task cancellation and cleanup. -
Simplified lifecycle management: Actors are [async context managers] and also a
run()
function is provided to easily run a group of actors and wait for them to finish.
Lifecycle¤
Actors are not started when they are created. There are 3 main ways to start an actor (from most to least recommended):
- By using the
run()
function. - By using the actor as an async context manager.
- By using the
start()
method.
Warning
-
If an actor raises an unhandled exception, it will be restarted automatically.
-
Actors manage
asyncio.Task
objects, so a reference to the actor object must be held for as long as the actor is expected to be running, otherwise its tasks will be cancelled and the actor will stop. For more information, please refer to the Pythonasyncio
documentation.
The run()
Function¤
The run()
function can start many actors at once and waits
for them to finish. If any of the actors is stopped with errors, the errors will be
logged.
Example
import asyncio
from frequenz.sdk.actor import Actor, run
class MyActor(Actor):
async def _run(self) -> None:
while True:
print("Hello World!")
await asyncio.sleep(1)
await asyncio.run(MyActor()) # (1)!
- This line will block until the actor completes its execution or is manually stopped.
Async Context Manager¤
When an actor is used as an async context manager, it is started when the
async with
block is entered and stopped automatically when the block is exited
(even if an unhandled exception is raised).
Example
The start()
Method¤
When using the start()
method, the actor is
started in the background and the method returns immediately. The actor will
continue to run until it is manually stopped or until it completes its execution.
Example
from frequenz.sdk.actor import Actor
class MyActor(Actor):
async def _run(self) -> None:
print("Hello World!")
actor = MyActor() # (1)!
actor.start() # (2)!
print("The actor is running") # (3)!
await actor.stop() # (4)!
- The actor is created but not started yet.
- The actor is started manually, it keeps running in the background.
-
Danger
If this function would raise an exception, the actor will never be stopped!
-
Until the actor is stopped manually.
Warning
This method is not recommended because it is easy to forget to stop the actor manually, especially in error conditions.
Communication¤
The Actor
class doesn't impose any specific way to
communicate between actors. However, frequenz.channels
are always used as the
communication mechanism between actors in the SDK.
Implementing an Actor¤
To implement an actor, you must inherit from the Actor
class and implement an initializer and the abstract _run()
method.
Initialization¤
The initializer is called when the actor is created. The
Actor
class initializer
(__init__
) should be always called first in the
class we are implementing to make sure actors are properly initialized.
The Actor.__init__()
takes one optional argument,
a name
that will be used to identify the actor in
logs. If no name is provided, a default name will be generated, but it is recommended
that Actor
subclasses can also receive a name as an
argument to make it easier to identify individual instances in logs more easily.
The actor initializer normally also accepts as arguments the input channels receivers and output channels senders that will be used for communication. These channels should be created outside the actor and passed to it as arguments to ensure actors can be composed easily.
Example echo actor
from frequenz.channels import Receiver, Sender
from frequenz.sdk.actor import Actor
class EchoActor(Actor): # (1)!
def __init__(
self,
receiver: Receiver[int], # (2)!
output: Sender[int], # (3)!
name: str | None = None, # (4)!
) -> None:
super().__init__(name=name) # (5)!
self._input: Receiver[int] = receiver # (6)!
self._output: Sender[int] = output # (7)!
-
We define a new actor class called
EchoActor
that inherits fromActor
. -
We accept an
receiver
argument that will be used to receive messages from a channel. - We accept an
output
argument that will be used to send messages to a channel. - We accept an optional
name
argument that will be used to identify the actor in logs. - We call
Actor.__init__()
to make sure the actor is properly initialized. - We store the
receiver
argument in a private attribute to use it later. - We store the
output
argument in a private attribute to use it later.
The _run()
Method¤
The abstract _run()
method is called automatically by the base class when the actor is
started.
Normally an actor should run forever (or until it is
stopped), so it is very common to have an infinite loop
in the _run()
method, typically receiving messages from one or more channels
(receivers), processing them and sending the results to
other channels (senders).
However, it is worth noting that an actor can also be designed for a one-time execution
or a limited number of runs, terminating upon completion.
Example echo actor
from frequenz.channels import Receiver, Sender
from frequenz.sdk.actor import Actor
class EchoActor(Actor):
def __init__(
self,
receiver: Receiver[int],
output: Sender[int],
name: str | None = None,
) -> None:
super().__init__(name=name)
self._input: Receiver[int] = receiver
self._output: Sender[int] = output
async def _run(self) -> None: # (1)!
async for msg in self._input: # (2)!
await self._output.send(msg) # (3)!
- We implement the abstract
_run()
method. - We receive messages from the
receiver
one by one. - We send the received message to the
output
channel.
Stopping¤
By default, the stop()
method will call the
cancel()
method (which will cancel all the tasks
created by the actor) and will wait for them to finish.
This means that when an actor is stopped, the _run()
method will receive
a CancelledError
exception. You should have this in mind
when implementing your actor and make sure to handle this exception properly if you need
to do any cleanup.
The actor will handle the CancelledError
exception
automatically if it is not handled in the _run()
method, so if there is no
need for extra cleanup, you don't need to worry about it.
If an unhandled exception is raised in the _run()
method, the actor will
re-run the _run()
method automatically. This ensures robustness in the face of
errors, but you should also have this in mind if you need to do any cleanup to make sure
the re-run doesn't cause any problems.
Tip
You could implement your own stop()
method to,
for example, send a message to a channel to notify that the actor should stop, or
any other more graceful way to stop the actor if you need to make sure it can't be
interrupted at any await
point.
Spawning Extra Tasks¤
Actors run at least one background task, created automatically by the
Actor
class. But Actor
inherits from BackgroundService
, which
provides a few methods to create and manage extra tasks.
If your actor needs to spawn extra tasks, you can use
BackgroundService
facilities to manage the
tasks, so they are also automatically stopped when the actor is stopped.
All you need to do is add the newly spawned tasks to the actor's
tasks
set.
Example
import asyncio
from frequenz.sdk.actor import Actor
class MyActor(Actor):
async def _run(self) -> None:
extra_task = asyncio.create_task(self._extra_task()) # (1)!
self.tasks.add(extra_task) # (2)!
while True: # (3)!
print("_run() running")
await asyncio.sleep(1)
async def _extra_task(self) -> None:
while True: # (4)!
print("_extra_task() running")
await asyncio.sleep(1.1)
async with MyActor() as actor: # (5)!
await asyncio.sleep(3) # (6)!
# (7)!
- We create a new task using
asyncio.create_task()
. - We add the task to the actor's
tasks
set. This ensures the task will be cancelled and cleaned up when the actor is stopped. - We leave the actor running forever.
- The extra task will also run forever.
- The actor is started.
- We wait for 3 seconds, the actor should print a bunch of "_run() running" and "_extra_task() running" messages while it's running.
- The actor is stopped and the extra task is cancelled automatically.
Examples¤
Here are a few simple but complete examples to demonstrate how to create actors and connect them using channels.
Tip
The code examples are annotated with markers (like ), you can click on them to see the step-by-step explanation of what's going on. The annotations are numbered according to the order of execution.
Composing actors¤
This example shows how to create two actors and connect them using broadcast channels.
import asyncio
from frequenz.channels import Broadcast, Receiver, Sender
from frequenz.sdk.actor import Actor
class Actor1(Actor): # (1)!
def __init__(
self,
receiver: Receiver[str],
output: Sender[str],
name: str | None = None,
) -> None:
super().__init__(name=name)
self._receiver = receiver
self._output = output
async def _run(self) -> None:
async for msg in self._receiver:
await self._output.send(f"Actor1 forwarding: {msg!r}") # (8)!
class Actor2(Actor):
def __init__(
self,
receiver: Receiver[str],
output: Sender[str],
name: str | None = None,
) -> None:
super().__init__(name=name)
self._receiver = receiver
self._output = output
async def _run(self) -> None:
async for msg in self._receiver:
await self._output.send(f"Actor2 forwarding: {msg!r}") # (9)!
async def main() -> None: # (2)!
# (4)!
input_channel: Broadcast[str] = Broadcast("Input to Actor1")
middle_channel: Broadcast[str] = Broadcast("Actor1 -> Actor2 stream")
output_channel: Broadcast[str] = Broadcast("Actor2 output")
input_sender = input_channel.new_sender()
output_receiver = output_channel.new_receiver()
async with ( # (5)!
Actor1(input_channel.new_receiver(), middle_channel.new_sender(), "actor1"),
Actor2(middle_channel.new_receiver(), output_channel.new_sender(), "actor2"),
):
await input_sender.send("Hello") # (6)!
msg = await output_receiver.receive() # (7)!
print(msg) # (10)!
# (11)!
if __name__ == "__main__": # (3)!
asyncio.run(main())
-
We define 2 actors:
Actor1
andActor2
that will just forward a message from an input channel to an output channel, adding some text. -
We define an async
main()
function within the main logic of our asyncio program. -
We start the
main()
function in the async loop usingasyncio.run()
. -
We create a bunch of broadcast channels to connect our actors.
input_channel
is the input channel forActor1
.middle_channel
is the channel that connectsActor1
andActor2
.output_channel
is the output channel forActor2
.
-
We create two actors and use them as async context managers,
Actor1
andActor2
, and connect them by creating new senders and receivers from the channels.Note
We don't use the
run()
function here because we want to stop the actors when we are done with them, but the actors will run forever (as long as the channel is not closed). So the async context manager is a better fit for this example. -
We schedule the sending of the message
Hello
toActor1
viainput_channel
. -
We receive (await) the response from
Actor2
viaoutput_channel
. Between this and the previous steps theasync
calls in the actors will be executed. -
Actor1
sends the re-formatted message (Actor1 forwarding: Hello
) toActor2
via themiddle_channel
. -
Actor2
sends the re-formatted message (Actor2 forwarding: "Actor1 forwarding: 'Hello'"
) to theoutput_channel
. -
Finally, we print the received message, which will still be
Actor2 forwarding: "Actor1 forwarding: 'Hello'"
. -
The actors are stopped and cleaned up automatically when the
async with
block ends.
The expected output is:
Receiving from multiple channels¤
This example shows how to create an actor that receives messages from multiple
broadcast channels using
select()
.
import asyncio
from frequenz.channels import Broadcast, Receiver, Sender
from frequenz.channels.util import select, selected_from
from frequenz.sdk.actor import Actor, run
class EchoActor(Actor): # (1)!
def __init__(
self,
receiver_1: Receiver[bool],
receiver_2: Receiver[bool],
output: Sender[bool],
name: str | None = None,
) -> None:
super().__init__(name=name)
self._receiver_1 = receiver_1
self._receiver_2 = receiver_2
self._output = output
async def _run(self) -> None: # (2)!
async for selected in select(self._receiver_1, self._receiver_2): # (10)!
if selected_from(selected, self._receiver_1): # (11)!
print(f"Received from receiver_1: {selected.value}")
await self._output.send(selected.value)
if not selected.value: # (12)!
break
elif selected_from(selected, self._receiver_2): # (13)!
print(f"Received from receiver_2: {selected.value}")
await self._output.send(selected.value)
if not selected.value: # (14)!
break
else:
assert False, "Unknown selected channel"
print("EchoActor finished")
# (15)!
# (3)!
input_channel_1 = Broadcast[bool]("input_channel_1")
input_channel_2 = Broadcast[bool]("input_channel_2")
echo_channel = Broadcast[bool]("echo_channel")
echo_actor = EchoActor( # (4)!
input_channel_1.new_receiver(),
input_channel_2.new_receiver(),
echo_channel.new_sender(),
"echo-actor",
)
echo_receiver = echo_channel.new_receiver() # (5)!
async def main() -> None: # (6)!
# (8)!
await input_channel_1.new_sender().send(True)
await input_channel_2.new_sender().send(False)
await run(echo_actor) # (9)!
await echo_channel.close() # (16)!
async for message in echo_receiver: # (17)!
print(f"Received {message=}")
if __name__ == "__main__": # (7)!
asyncio.run(main())
-
We define an
EchoActor
that receives messages from two channels and sends them to another channel. -
We implement the
_run()
method that will receive messages from the two channels usingselect()
and send them to the output channel. Therun()
method will stop if aFalse
message is received. -
We create the channels that will be used with the actor.
-
We create the actor and connect it to the channels by creating new receivers and senders from the channels.
-
We create a receiver for the
echo_channel
to eventually receive the messages sent by the actor. -
We define the
main()
function that will run the actor. -
We start the
main()
function in the async loop usingasyncio.run()
. -
We send a message to each of the input channels. These messages will be queued in the channels until they are consumed by the actor.
-
We start the actor and wait for it to finish using the
run()
function. -
The
select()
function will get the first message available from the two channels. The order in which they will be handled is unknown, but in this example we assume that the first message will be frominput_channel_1
(True
) and the second frominput_channel_1
(False
). -
The
selected_from()
function will returnTrue
for theinput_channel_1
receiver.selected.value
holds the received message, so"Received from receiver_1: True"
will be printed andTrue
will be sent to theoutput
channel. -
Since
selected.value
isTrue
, the loop will continue, going back to theselect()
function. -
The
selected_from()
function will returnFalse
for theinput_channel_1
receiver andTrue
for theinput_channel_2
receiver. The message stored inselected.value
will now beFalse
, so"Received from receiver_2: False"
will be printed andFalse
will be sent to theoutput
channel. -
Since
selected.value
isFalse
, the loop will break. -
The
_run()
method will finish normally and the actor will be stopped, so therun()
function will return. -
We close the
echo_channel
to make sure theecho_receiver
will stop receiving messages after all the queued messages are consumed (otherwise the step 17 will never end!). -
We receive the messages sent by the actor to the
echo_channel
one by one and print them, it should print firstReceived message=True
and thenReceived message=False
.
The expected output is:
Received from receiver_1: True
Received from receiver_2: False
Received message=True
Received message=False