asyncio
frequenz.core.asyncio ¤
General purpose async tools.
This module provides general purpose async tools that can be used to simplify the development of asyncio-based applications.
The module provides the following classes and functions:
- cancel_and_await: A function that cancels a
task and waits for it to finish, handling
CancelledError
exceptions. - PersistentTaskGroup: An alternative to
asyncio.TaskGroup
to manage tasks that run until explicitly stopped. - Service: An interface for services running in the background.
- ServiceBase: A base class for implementing services running in the background.
- TaskCreator: A protocol for creating tasks.
Attributes¤
frequenz.core.asyncio.TaskReturnT
module-attribute
¤
TaskReturnT = TypeVar('TaskReturnT')
The type of the return value of a task.
Classes¤
frequenz.core.asyncio.PersistentTaskGroup ¤
A group of tasks that should run until explicitly stopped.
asyncio.TaskGroup
is a very convenient construct when using parallelization
for doing calculations for example, where the results for all the tasks need to be
merged together to produce a final result. In this case if one of the tasks fails,
it makes sense to cancel the others and abort as soon as possible, as any further
calculations would be thrown away.
This class is intended to help managing a group of tasks that should persist even if other tasks in the group fail, usually by either only discarding the failed task or by restarting it somehow.
This class is also typically used as a context manager, but in this case when the
context manager is exited, the tasks are not only awaited, they are first cancelled,
so all the background tasks are stopped. If any task was ended due to an unhandled
exception, the exception will be re-raised when the context manager exits as
BaseExceptionGroup
.
As with asyncio.TaskGroup
, the tasks should be created using the
create_task()
method.
To monitor the subtasks and handle exceptions or early termination,
a as_completed()
method
is provided, similar to asyncio.as_completed
but not quite the same. Using
this method is the only way to acknowledge tasks failures, so they are not raised
when the service is await
ed or when the context manager is exited.
Example
This program will run forever, printing the current time now and then and restarting the failing task each time it crashes.
import asyncio
import datetime
async def print_every(*, seconds: float) -> None:
while True:
await asyncio.sleep(seconds)
print(datetime.datetime.now())
async def fail_after(*, seconds: float) -> None:
await asyncio.sleep(seconds)
raise ValueError("I failed")
async def main() -> None:
async with PersistentTaskGroup() as group:
group.create_task(print_every(seconds=1), name="print_1")
group.create_task(print_every(seconds=11), name="print_11")
failing = group.create_task(fail_after(seconds=5), name=f"fail_5")
async for task in group.as_completed():
assert task.done() # For demonstration purposes only
try:
task.result()
except ValueError as error:
if failing == task:
failing = group.create_task(fail_after(seconds=5), name=f"fail_5")
else:
raise
asyncio.run(main())
Source code in frequenz/core/asyncio/_task_group.py
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 |
|
Attributes¤
is_running
property
¤
is_running: bool
Whether this task group is running.
A task group is considered running when at least one task is running.
tasks
property
¤
The set of tasks managed by this group.
Users typically should not modify the tasks in the returned set and only use them for informational purposes.
Both running tasks and tasks pending for acknowledgment are included in the returned set.
Danger
Changing the returned tasks may lead to unexpected behavior, don't do it unless the class explicitly documents it is safe to do so.
Functions¤
__aexit__
async
¤
__aexit__(
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None,
) -> bool | None
Exit an async context.
Stop this instance.
PARAMETER | DESCRIPTION |
---|---|
exc_type
|
The type of the exception raised, if any.
TYPE:
|
exc_val
|
The exception raised, if any.
TYPE:
|
exc_tb
|
The traceback of the exception raised, if any.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
bool | None
|
Whether the exception was handled. |
RAISES | DESCRIPTION |
---|---|
BaseExceptionGroup
|
If any of the tasks spawned by this group raised an exception. |
Source code in frequenz/core/asyncio/_task_group.py
__await__ ¤
__await__() -> Generator[None, None, None]
Await for all tasks managed by this group to finish.
RETURNS | DESCRIPTION |
---|---|
None
|
An implementation-specific generator for the awaitable. |
RAISES | DESCRIPTION |
---|---|
BaseExceptionGroup
|
If any of the tasks spawned by this group raised an exception. |
Source code in frequenz/core/asyncio/_task_group.py
__del__ ¤
__init__ ¤
__init__(
*,
unique_id: str | None = None,
task_creator: TaskCreator = asyncio
) -> None
Initialize this instance.
PARAMETER | DESCRIPTION |
---|---|
unique_id
|
The string to uniquely identify this instance. If
TYPE:
|
task_creator
|
The object that will be used to create tasks. Usually one of:
the
TYPE:
|
Source code in frequenz/core/asyncio/_task_group.py
__repr__ ¤
__repr__() -> str
Return a string representation of this instance.
RETURNS | DESCRIPTION |
---|---|
str
|
A string representation of this instance. |
Source code in frequenz/core/asyncio/_task_group.py
__str__ ¤
__str__() -> str
Return a string representation of this instance.
RETURNS | DESCRIPTION |
---|---|
str
|
A string representation of this instance. |
as_completed
async
¤
as_completed(
*, timeout: float | timedelta | None = None
) -> AsyncIterator[Task[Any]]
Iterate over running tasks yielding as they complete.
Stops iterating when there are no more running tasks and all done tasks have been acknowledged, or if the timeout is reached.
Note
If an exception is raised while yielding a task, the task will be considered not handled and will be yielded again until it is handled without raising any exceptions.
PARAMETER | DESCRIPTION |
---|---|
timeout
|
The maximum time to wait for the next task to complete. If |
YIELDS | DESCRIPTION |
---|---|
AsyncIterator[Task[Any]]
|
The tasks as they complete. |
Source code in frequenz/core/asyncio/_task_group.py
cancel ¤
cancel(msg: str | None = None) -> None
Cancel all running tasks spawned by this group.
PARAMETER | DESCRIPTION |
---|---|
msg
|
The message to be passed to the tasks being cancelled.
TYPE:
|
create_task ¤
create_task(
coro: Coroutine[Any, Any, TaskReturnT],
*,
name: str | None = None,
context: Context | None = None,
log_exception: bool = True
) -> Task[TaskReturnT]
Start a managed task.
A reference to the task will be held by the task group, so there is no need to save the task object.
Tasks can be retrieved via the
tasks
property.
Managed tasks always have a name
including information about the task group
itself. If you need to retrieve the final name of the task you can always do so
by calling .get_name()
on the returned task.
Tasks created this way will also be automatically cancelled when calling
cancel()
or
stop()
, or when the service is used
as a async context manager.
To inform that a finished task was properly handled, the method
as_completed()
should be used.
PARAMETER | DESCRIPTION |
---|---|
coro
|
The coroutine to be managed.
TYPE:
|
name
|
The name of the task. Names will always have the form
TYPE:
|
context
|
The context to be used for the task.
TYPE:
|
log_exception
|
Whether to log exceptions raised by the task.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
Task[TaskReturnT]
|
The new task. |
Source code in frequenz/core/asyncio/_task_group.py
stop
async
¤
stop(msg: str | None = None) -> None
Stop this task group.
This method cancels all running tasks spawned by this group and waits for them to finish.
PARAMETER | DESCRIPTION |
---|---|
msg
|
The message to be passed to the tasks being cancelled.
TYPE:
|
RAISES | DESCRIPTION |
---|---|
BaseExceptionGroup
|
If any of the tasks spawned by this group raised an exception. |
Source code in frequenz/core/asyncio/_task_group.py
frequenz.core.asyncio.Service ¤
Bases: ABC
A service running in the background.
A service swpawns one of more background tasks and can be started and stopped and can work as an async context manager to provide deterministic cleanup.
Warning
As services manage asyncio.Task
objects, a reference to a running service
must be held for as long as the service is expected to be running. Otherwise, its
tasks will be cancelled and the service will stop. For more information, please
refer to the Python asyncio
documentation.
Example
async def as_context_manager(service: Service) -> None:
async with service:
assert service.is_running
await asyncio.sleep(5)
assert not service.is_running
async def manual_start_stop(service: Service) -> None:
# Use only if necessary, as cleanup is more complicated
service.start()
await asyncio.sleep(5)
await service.stop()
Source code in frequenz/core/asyncio/_service.py
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 |
|
Attributes¤
Functions¤
__aexit__
abstractmethod
async
¤
__aexit__(
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None,
) -> bool | None
Exit an async context.
Stop this service.
PARAMETER | DESCRIPTION |
---|---|
exc_type
|
The type of the exception raised, if any.
TYPE:
|
exc_val
|
The exception raised, if any.
TYPE:
|
exc_tb
|
The traceback of the exception raised, if any.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
bool | None
|
Whether the exception was handled. |
Source code in frequenz/core/asyncio/_service.py
__await__
abstractmethod
¤
__await__() -> Generator[None, None, None]
Wait for this service to finish.
Wait until all the service tasks are finished.
RETURNS | DESCRIPTION |
---|---|
None
|
An implementation-specific generator for the awaitable. |
RAISES | DESCRIPTION |
---|---|
BaseExceptionGroup
|
If any of the tasks spawned by this service raised an
exception ( |
Source code in frequenz/core/asyncio/_service.py
cancel
abstractmethod
¤
cancel(msg: str | None = None) -> None
Cancel this service.
PARAMETER | DESCRIPTION |
---|---|
msg
|
The message to be passed to the tasks being cancelled.
TYPE:
|
start
abstractmethod
¤
stop
abstractmethod
async
¤
stop(msg: str | None = None) -> None
Stop this service.
This method cancels the service and waits for it to finish.
PARAMETER | DESCRIPTION |
---|---|
msg
|
The message to be passed to the tasks being cancelled.
TYPE:
|
RAISES | DESCRIPTION |
---|---|
BaseExceptionGroup
|
If any of the tasks spawned by this service raised an exception. |
Source code in frequenz/core/asyncio/_service.py
frequenz.core.asyncio.ServiceBase ¤
A base class for implementing a service running in the background.
To implement a service, subclasses must implement the
start()
method, which should start the
background tasks needed by the service using the
create_task()
method.
If you need to collect results or handle exceptions of the tasks when stopping the
service, then you need to also override the
stop()
method, as the base
implementation does not collect any results and re-raises all exceptions.
Simple single-task example
import datetime
import asyncio
from typing_extensions import override
class Clock(ServiceBase):
def __init__(self, resolution_s: float, *, unique_id: str | None = None) -> None:
super().__init__(unique_id=unique_id)
self._resolution_s = resolution_s
@override
async def main(self) -> None:
while True:
await asyncio.sleep(self._resolution_s)
print(datetime.datetime.now())
async def main() -> None:
# As an async context manager
async with Clock(resolution_s=1):
await asyncio.sleep(5)
# Manual start/stop (only use if necessary, as cleanup is more complicated)
clock = Clock(resolution_s=1)
clock.start()
await asyncio.sleep(5)
await clock.stop()
asyncio.run(main())
Multi-tasks example
import asyncio
import datetime
from typing_extensions import override
class MultiTaskService(ServiceBase):
async def _print_every(self, *, seconds: float) -> None:
while True:
await asyncio.sleep(seconds)
print(datetime.datetime.now())
async def _fail_after(self, *, seconds: float) -> None:
await asyncio.sleep(seconds)
raise ValueError("I failed")
@override
async def main(self) -> None:
self.create_task(self._print_every(seconds=1), name="print_1")
self.create_task(self._print_every(seconds=11), name="print_11")
failing = self.create_task(self._fail_after(seconds=5), name=f"fail_5")
async for task in self.task_group.as_completed():
assert task.done() # For demonstration purposes only
try:
task.result()
except ValueError as error:
if failing == task:
failing = self.create_task(
self._fail_after(seconds=5), name=f"fail_5"
)
else:
raise
async def main() -> None:
async with MultiTaskService():
await asyncio.sleep(11)
asyncio.run(main())
Source code in frequenz/core/asyncio/_service.py
133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 |
|
Attributes¤
is_running
property
¤
is_running: bool
Whether this service is running.
A service is considered running when at least one task is running.
task_group
property
¤
task_group: PersistentTaskGroup
The task group managing the tasks of this service.
Functions¤
__aenter__
async
¤
__aenter__() -> Self
__aexit__
async
¤
__aexit__(
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None,
) -> bool | None
Exit an async context.
Stop this service.
PARAMETER | DESCRIPTION |
---|---|
exc_type
|
The type of the exception raised, if any.
TYPE:
|
exc_val
|
The exception raised, if any.
TYPE:
|
exc_tb
|
The traceback of the exception raised, if any.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
bool | None
|
Whether the exception was handled. |
Source code in frequenz/core/asyncio/_service.py
__await__ ¤
__await__() -> Generator[None, None, None]
Await this service.
An awaited service will wait for all its tasks to finish.
RETURNS | DESCRIPTION |
---|---|
None
|
An implementation-specific generator for the awaitable. |
Source code in frequenz/core/asyncio/_service.py
__del__ ¤
__init__ ¤
__init__(
*,
unique_id: str | None = None,
task_creator: TaskCreator = asyncio
) -> None
Initialize this Service.
PARAMETER | DESCRIPTION |
---|---|
unique_id
|
The string to uniquely identify this service instance.
If
TYPE:
|
task_creator
|
The object that will be used to create tasks. Usually one of:
the
TYPE:
|
Source code in frequenz/core/asyncio/_service.py
__repr__ ¤
__repr__() -> str
Return a string representation of this instance.
RETURNS | DESCRIPTION |
---|---|
str
|
A string representation of this instance. |
Source code in frequenz/core/asyncio/_service.py
__str__ ¤
__str__() -> str
Return a string representation of this instance.
RETURNS | DESCRIPTION |
---|---|
str
|
A string representation of this instance. |
cancel ¤
cancel(msg: str | None = None) -> None
Cancel all running tasks spawned by this service.
PARAMETER | DESCRIPTION |
---|---|
msg
|
The message to be passed to the tasks being cancelled.
TYPE:
|
Source code in frequenz/core/asyncio/_service.py
create_task ¤
create_task(
coro: Coroutine[Any, Any, TaskReturnT],
*,
name: str | None = None,
context: Context | None = None,
log_exception: bool = True
) -> Task[TaskReturnT]
Start a managed task.
A reference to the task will be held by the service, so there is no need to save the task object.
Tasks are created using the
task_group
.
Managed tasks always have a name
including information about the service
itself. If you need to retrieve the final name of the task you can always do so
by calling .get_name()
on the returned task.
Tasks created this way will also be automatically cancelled when calling
cancel()
or
stop()
, or when the service is used
as a async context manager.
PARAMETER | DESCRIPTION |
---|---|
coro
|
The coroutine to be managed.
TYPE:
|
name
|
The name of the task. Names will always have the form
TYPE:
|
context
|
The context to be used for the task.
TYPE:
|
log_exception
|
Whether to log exceptions raised by the task.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
Task[TaskReturnT]
|
The new task. |
Source code in frequenz/core/asyncio/_service.py
main
abstractmethod
async
¤
start ¤
stop
async
¤
stop(msg: str | None = None) -> None
Stop this service.
This method cancels all running tasks spawned by this service and waits for them to finish.
PARAMETER | DESCRIPTION |
---|---|
msg
|
The message to be passed to the tasks being cancelled.
TYPE:
|
RAISES | DESCRIPTION |
---|---|
BaseExceptionGroup
|
If any of the tasks spawned by this service raised an exception. |
Source code in frequenz/core/asyncio/_service.py
frequenz.core.asyncio.TaskCreator ¤
Bases: Protocol
A protocol for creating tasks.
Built-in asyncio functions and classes implementing this protocol:
asyncio
asyncio.AbstractEventLoop
(returned byasyncio.get_event_loop
for example)asyncio.TaskGroup
Source code in frequenz/core/asyncio/_util.py
Functions¤
create_task ¤
create_task(
coro: Coroutine[Any, Any, TaskReturnT],
*,
name: str | None = None,
context: Context | None = None
) -> Task[TaskReturnT]
Create a task.
PARAMETER | DESCRIPTION |
---|---|
coro
|
The coroutine to be executed.
TYPE:
|
name
|
The name of the task.
TYPE:
|
context
|
The context to be used for the task.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
Task[TaskReturnT]
|
The new task. |
Source code in frequenz/core/asyncio/_util.py
Functions¤
frequenz.core.asyncio.cancel_and_await
async
¤
Cancel a task and wait for it to finish.
Exits immediately if the task is already done.
The CancelledError
is suppressed, but any other exception will be propagated.
PARAMETER | DESCRIPTION |
---|---|
task
|
The task to be cancelled and waited for. |