Index
frequenz.sdk.timeseries ¤
Handling of timeseries streams.
A timeseries is a stream (normally an async iterator) of
Sample
s.
Periodicity and alignment¤
All the data produced by this package is always periodic, in UTC, and aligned to the Epoch (by default).
Classes normally take a (re)sampling period as and argument and, optionally, an
align_to
argument.
This means timestamps are always separated exactly by a period, and that this
timestamp falls always at multiples of the period, starting at the align_to
.
This ensures that the data is predictable and consistent among restarts.
Example
If we have a period of 10 seconds, and are aligning to the UNIX
epoch. Assuming the following timeline starts in 1970-01-01 00:00:00
UTC and our current now
is 1970-01-01 00:00:32 UTC, then the next
timestamp will be at 1970-01-01 00:00:40 UTC:
Attributes¤
frequenz.sdk.timeseries.DEFAULT_BUFFER_LEN_INIT
module-attribute
¤
Default initial buffer length.
Buffers will be created initially with this length, but they could grow or shrink depending on the source properties, like sampling rate, to make sure all the requested past sampling periods can be stored.
frequenz.sdk.timeseries.DEFAULT_BUFFER_LEN_MAX
module-attribute
¤
Default maximum allowed buffer length.
If a buffer length would get bigger than this, it will be truncated to this length.
frequenz.sdk.timeseries.DEFAULT_BUFFER_LEN_WARN
module-attribute
¤
Default minimum buffer length that will produce a warning.
If a buffer length would get bigger than this, a warning will be logged.
frequenz.sdk.timeseries.Sink
module-attribute
¤
A sink for a timeseries.
A new timeseries can be generated by sending samples to a sink.
This should be an async
callable, for example:
PARAMETER | DESCRIPTION |
---|---|
sample
|
A sample to be sent out.
TYPE:
|
frequenz.sdk.timeseries.Source
module-attribute
¤
Source = AsyncIterator[Sample[Quantity]]
A source for a timeseries.
A timeseries can be received sample by sample in a streaming way using a source.
Classes¤
frequenz.sdk.timeseries.Bounds
dataclass
¤
Bases: Generic[_T]
Lower and upper bound values.
Depending on the genertic type _T
, the lower and upper bounds can be None
, in
which case it means that there is no lower or upper bound, respectively.
When checking if an item is within the bounds, the item must always be not None
.
Source code in src/frequenz/sdk/timeseries/_base_types.py
Attributes¤
Functions¤
__contains__ ¤
__contains__(item: _T) -> bool
Check if the value is within the range of the container.
PARAMETER | DESCRIPTION |
---|---|
item
|
The value to check. Can't be
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
bool
|
True if value is within the range, otherwise False.
TYPE:
|
Source code in src/frequenz/sdk/timeseries/_base_types.py
frequenz.sdk.timeseries.ClocksInfo
dataclass
¤
Information about the wall clock and monotonic clock and their drift.
The monotonic_requested_sleep
and monotonic_elapsed
values must be strictly
positive, while the wall_clock_elapsed
can be negative if the wall clock jumped
back in time.
Source code in src/frequenz/sdk/timeseries/_resampling/_wall_clock_timer.py
179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 |
|
Attributes¤
monotonic_drift
property
¤
monotonic_drift: timedelta
The difference between the monotonic elapsed and requested sleep time.
This number should be always positive, as the monotonic time should never jump back in time.
monotonic_elapsed
instance-attribute
¤
monotonic_elapsed: timedelta
The elapsed time in monotonic time (must be non-negative).
monotonic_requested_sleep
instance-attribute
¤
monotonic_requested_sleep: timedelta
The requested monotonic sleep time used to gather the information (must be positive).
monotonic_time
instance-attribute
¤
monotonic_time: float
The monotonic time right after the sleep was done.
wall_clock_elapsed
instance-attribute
¤
wall_clock_elapsed: timedelta
The elapsed time in wall clock time.
wall_clock_factor
class-attribute
instance-attribute
¤
The factor to convert wall clock time to monotonic time.
Typically, if the wall clock time expanded compared to the monotonic time (i.e. is more in the future), the returned value will be smaller than 1. If the wall clock time compressed compared to the monotonic time (i.e. is more in the past), the returned value will be bigger than 1.
In cases where there are big time jumps this might be overridden by the previous wall clock factor to avoid adjusting by excessive amounts, when the time will resync anyway to catch up.
wall_clock_jump
property
¤
wall_clock_jump: timedelta
The amount of time the wall clock jumped compared to the monotonic time.
If the wall clock is faster then the monotonic time (or jumped forward in time), the returned value will be positive. If the wall clock is slower than the monotonic time (or jumped backwards in time), the returned value will be negative.
Note
Strictly speaking, both could be in sync and the result would be 0.0, but this is extremely unlikely due to floating point precision and the fact that both clocks are obtained as slightly different times.
wall_clock_time
instance-attribute
¤
wall_clock_time: datetime
The wall clock time right after the sleep was done.
Functions¤
__post_init__ ¤
Check that the values are valid.
RAISES | DESCRIPTION |
---|---|
ValueError
|
If any value is out of range. |
Source code in src/frequenz/sdk/timeseries/_resampling/_wall_clock_timer.py
wall_clock_to_monotonic ¤
Convert a wall clock timedelta to a monotonic timedelta.
This is useful to calculate how much one should sleep on the monotonic clock to reach a particular wall clock time, adjusting to the difference in speed or jumps between both.
PARAMETER | DESCRIPTION |
---|---|
wall_clock_timedelta
|
The wall clock timedelta to convert.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
timedelta
|
The monotonic timedelta corresponding to |
Source code in src/frequenz/sdk/timeseries/_resampling/_wall_clock_timer.py
frequenz.sdk.timeseries.Fuse
dataclass
¤
frequenz.sdk.timeseries.MovingWindow ¤
Bases: BackgroundService
A data window that moves with the latest datapoints of a data stream.
After initialization the MovingWindow
can be accessed by an integer
index or a timestamp. A sub window can be accessed by using a slice of
integers or timestamps.
Note that a numpy ndarray is returned and thus users can use numpys operations directly on a window.
The window uses a ring buffer for storage and the first element is aligned to
a fixed defined point in time. Since the moving nature of the window, the
date of the first and the last element are constantly changing and therefore
the point in time that defines the alignment can be outside of the time window.
Modulo arithmetic is used to move the align_to
timestamp into the latest
window.
If for example the align_to
parameter is set to
datetime(1, 1, 1, tzinfo=timezone.utc)
and the window size is bigger than
one day then the first element will always be aligned to midnight.
Resampling might be required to reduce the number of samples to store, and it can be set by specifying the resampler config parameter so that the user can control the granularity of the samples to be stored in the underlying buffer.
If resampling is not required, the resampler config parameter can be set to None in which case the MovingWindow will not perform any resampling.
Example: Calculate the mean of a time interval
```python
from datetime import datetime, timedelta, timezone
async def send_mock_data(sender: Sender[Sample]) -> None:
while True:
await sender.send(Sample(datetime.now(tz=timezone.utc), 10.0))
await asyncio.sleep(1.0)
async def run() -> None:
resampled_data_channel = Broadcast[Sample](name="sample-data")
resampled_data_receiver = resampled_data_channel.new_receiver()
resampled_data_sender = resampled_data_channel.new_sender()
send_task = asyncio.create_task(send_mock_data(resampled_data_sender))
async with MovingWindow(
size=timedelta(seconds=5),
resampled_data_recv=resampled_data_receiver,
input_sampling_period=timedelta(seconds=1),
) as window:
time_start = datetime.now(tz=timezone.utc)
time_end = time_start + timedelta(seconds=5)
# ... wait for 5 seconds until the buffer is filled
await asyncio.sleep(5)
# return an numpy array from the window
array = window[time_start:time_end]
# and use it to for example calculate the mean
mean = array.mean()
asyncio.run(run())
```
Example: Create a polars data frame from a MovingWindow
```python
from datetime import datetime, timedelta, timezone
async def send_mock_data(sender: Sender[Sample]) -> None:
while True:
await sender.send(Sample(datetime.now(tz=timezone.utc), 10.0))
await asyncio.sleep(1.0)
async def run() -> None:
resampled_data_channel = Broadcast[Sample](name="sample-data")
resampled_data_receiver = resampled_data_channel.new_receiver()
resampled_data_sender = resampled_data_channel.new_sender()
send_task = asyncio.create_task(send_mock_data(resampled_data_sender))
# create a window that stores two days of data
# starting at 1.1.23 with samplerate=1
async with MovingWindow(
size=timedelta(days=2),
resampled_data_recv=resampled_data_receiver,
input_sampling_period=timedelta(seconds=1),
) as window:
# wait for one full day until the buffer is filled
await asyncio.sleep(60*60*24)
time_start = datetime(2023, 1, 1, tzinfo=timezone.utc)
time_end = datetime(2023, 1, 2, tzinfo=timezone.utc)
# You can now create a polars series with one full day of data by
# passing the window slice, like:
# series = pl.Series("Jan_1", window[time_start:time_end])
asyncio.run(run())
```
Source code in src/frequenz/sdk/timeseries/_moving_window.py
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 |
|
Attributes¤
capacity
property
¤
capacity: int
Return the capacity of the MovingWindow.
Capacity is the maximum number of samples that can be stored in the MovingWindow.
RETURNS | DESCRIPTION |
---|---|
int
|
The capacity of the MovingWindow. |
is_running
property
¤
is_running: bool
Return whether this background service is running.
A service is considered running when at least one task is running.
RETURNS | DESCRIPTION |
---|---|
bool
|
Whether this background service is running. |
name
property
¤
name: str
The name of this background service.
RETURNS | DESCRIPTION |
---|---|
str
|
The name of this background service. |
newest_timestamp
property
¤
newest_timestamp: datetime | None
Return the newest timestamp of the MovingWindow.
RETURNS | DESCRIPTION |
---|---|
datetime | None
|
The newest timestamp of the MovingWindow or None if the buffer is empty. |
oldest_timestamp
property
¤
oldest_timestamp: datetime | None
Return the oldest timestamp of the MovingWindow.
RETURNS | DESCRIPTION |
---|---|
datetime | None
|
The oldest timestamp of the MovingWindow or None if the buffer is empty. |
sampling_period
property
¤
sampling_period: timedelta
Return the sampling period of the MovingWindow.
RETURNS | DESCRIPTION |
---|---|
timedelta
|
The sampling period of the MovingWindow. |
tasks
property
¤
Return the set of running tasks spawned by this background service.
Users typically should not modify the tasks in the returned set and only use them for informational purposes.
Danger
Changing the returned tasks may lead to unexpected behavior, don't do it unless the class explicitly documents it is safe to do so.
RETURNS | DESCRIPTION |
---|---|
Set[Task[Any]]
|
The set of running tasks spawned by this background service. |
Functions¤
__aenter__
async
¤
__aenter__() -> Self
Enter an async context.
Start this background service.
RETURNS | DESCRIPTION |
---|---|
Self
|
This background service. |
__aexit__
async
¤
__aexit__(
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None,
) -> None
Exit an async context.
Stop this background service.
PARAMETER | DESCRIPTION |
---|---|
exc_type
|
The type of the exception raised, if any.
TYPE:
|
exc_val
|
The exception raised, if any.
TYPE:
|
exc_tb
|
The traceback of the exception raised, if any.
TYPE:
|
Source code in src/frequenz/sdk/actor/_background_service.py
__await__ ¤
__await__() -> Generator[None, None, None]
Await this background service.
An awaited background service will wait for all its tasks to finish.
RETURNS | DESCRIPTION |
---|---|
None
|
An implementation-specific generator for the awaitable. |
Source code in src/frequenz/sdk/actor/_background_service.py
__getitem__ ¤
__getitem__(key: SupportsIndex) -> float
__getitem__(
key: SupportsIndex | datetime | slice,
) -> float | ArrayLike
Return a sub window of the MovingWindow
.
The MovingWindow
is accessed either by timestamp or by index
or by a slice of timestamps or integers.
- If the key is an integer, the float value of that key at the given position is returned.
- If the key is a timestamp, the float value of that key that corresponds to the timestamp is returned.
- If the key is a slice of timestamps or integers, an ndarray is returned, where the bounds correspond to the slice bounds. Note that a half open interval, which is open at the end, is returned.
Note
Slicing with a step other than 1 is not supported.
PARAMETER | DESCRIPTION |
---|---|
key
|
Either an integer or a timestamp or a slice of timestamps or integers.
TYPE:
|
RAISES | DESCRIPTION |
---|---|
IndexError
|
when requesting an out of range timestamp or index |
ValueError
|
when requesting a slice with a step other than 1 |
RETURNS | DESCRIPTION |
---|---|
float | ArrayLike
|
A float if the key is a number or a timestamp. |
float | ArrayLike
|
an numpy array if the key is a slice. |
Source code in src/frequenz/sdk/timeseries/_moving_window.py
__init__ ¤
__init__(
*,
size: timedelta,
resampled_data_recv: Receiver[Sample[Quantity]],
input_sampling_period: timedelta,
resampler_config: ResamplerConfig | None = None,
align_to: datetime = UNIX_EPOCH,
name: str | None = None
) -> None
Initialize the MovingWindow.
This method creates the underlying ring buffer and starts a new task that updates the ring buffer with new incoming samples. The task stops running only if the channel receiver is closed.
PARAMETER | DESCRIPTION |
---|---|
size
|
The time span of the moving window over which samples will be stored.
TYPE:
|
resampled_data_recv
|
A receiver that delivers samples with a given sampling period. |
input_sampling_period
|
The time interval between consecutive input samples.
TYPE:
|
resampler_config
|
The resampler configuration in case resampling is required.
TYPE:
|
align_to
|
A timestamp that defines a point in time to which the window is aligned to modulo window size. For further information, consult the class level documentation.
TYPE:
|
name
|
The name of this moving window. If
TYPE:
|
Source code in src/frequenz/sdk/timeseries/_moving_window.py
__repr__ ¤
__repr__() -> str
Return a string representation of this instance.
RETURNS | DESCRIPTION |
---|---|
str
|
A string representation of this instance. |
__str__ ¤
__str__() -> str
Return a string representation of this instance.
RETURNS | DESCRIPTION |
---|---|
str
|
A string representation of this instance. |
at ¤
Return the sample at the given index or timestamp.
In contrast to the window
method,
which expects a slice as argument, this method expects a single index as argument
and returns a single value.
PARAMETER | DESCRIPTION |
---|---|
key
|
The index or timestamp of the sample to return. |
RETURNS | DESCRIPTION |
---|---|
float
|
The sample at the given index or timestamp. |
RAISES | DESCRIPTION |
---|---|
IndexError
|
If the buffer is empty or the index is out of bounds. |
Source code in src/frequenz/sdk/timeseries/_moving_window.py
cancel ¤
cancel(msg: str | None = None) -> None
Cancel all running tasks spawned by this background service.
PARAMETER | DESCRIPTION |
---|---|
msg
|
The message to be passed to the tasks being cancelled.
TYPE:
|
Source code in src/frequenz/sdk/actor/_background_service.py
count_covered ¤
Count the number of samples that are covered by the oldest and newest valid samples.
If since
and until
are provided, the count is limited to the samples between
(and including) the given timestamps.
PARAMETER | DESCRIPTION |
---|---|
since
|
The timestamp from which to start counting. If
TYPE:
|
until
|
The timestamp until (and including) which to count. If
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
int
|
The count of samples between the oldest and newest (inclusive) valid samples or 0 if there are is no time range covered. |
Source code in src/frequenz/sdk/timeseries/_moving_window.py
count_valid ¤
Count the number of valid samples in this MovingWindow
.
If since
and until
are provided, the count is limited to the samples between
(and including) the given timestamps.
PARAMETER | DESCRIPTION |
---|---|
since
|
The timestamp from which to start counting. If
TYPE:
|
until
|
The timestamp until (and including) which to count. If
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
int
|
The number of valid samples in this |
Source code in src/frequenz/sdk/timeseries/_moving_window.py
start ¤
Start the MovingWindow.
This method starts the MovingWindow tasks.
Source code in src/frequenz/sdk/timeseries/_moving_window.py
stop
async
¤
stop(msg: str | None = None) -> None
Stop this background service.
This method cancels all running tasks spawned by this service and waits for them to finish.
PARAMETER | DESCRIPTION |
---|---|
msg
|
The message to be passed to the tasks being cancelled.
TYPE:
|
RAISES | DESCRIPTION |
---|---|
BaseExceptionGroup
|
If any of the tasks spawned by this service raised an exception. |
Source code in src/frequenz/sdk/actor/_background_service.py
wait
async
¤
Wait this background service to finish.
Wait until all background service tasks are finished.
RAISES | DESCRIPTION |
---|---|
BaseExceptionGroup
|
If any of the tasks spawned by this service raised an
exception ( |
Source code in src/frequenz/sdk/actor/_background_service.py
wait_for_samples
async
¤
wait_for_samples(n: int) -> None
Wait until the next n
samples are available in the MovingWindow.
This function returns after n
new samples are available in the MovingWindow,
without considering whether the new samples are valid. The validity of the
samples can be verified by calling the
count_valid
method.
PARAMETER | DESCRIPTION |
---|---|
n
|
The number of samples to wait for.
TYPE:
|
RAISES | DESCRIPTION |
---|---|
ValueError
|
If |
Source code in src/frequenz/sdk/timeseries/_moving_window.py
window ¤
window(
start: datetime | int | None,
end: datetime | int | None,
*,
force_copy: bool = True,
fill_value: float | None = nan
) -> ArrayLike
Return an array containing the samples in the given time interval.
In contrast to the at
method,
which expects a single index as argument, this method expects a slice as argument
and returns an array.
PARAMETER | DESCRIPTION |
---|---|
start
|
The start timestamp of the time interval. If |
end
|
The end timestamp of the time interval. If |
force_copy
|
If
TYPE:
|
fill_value
|
If not None, will use this value to fill missing values. If missing values should be set, force_copy must be True. Defaults to NaN to avoid returning outdated data unexpectedly. |
RETURNS | DESCRIPTION |
---|---|
ArrayLike
|
An array containing the samples in the given time interval. |
Source code in src/frequenz/sdk/timeseries/_moving_window.py
frequenz.sdk.timeseries.PeriodicFeatureExtractor ¤
A feature extractor for historical timeseries data.
This class is creating a profile from periodically occurring windows in a buffer of historical data.
The profile is created out of all windows that are fully contained in the underlying buffer with the same start and end time modulo a fixed period.
Consider for example a timeseries $T$ of historical data and sub-series $S_i$ of $T$ all having the same size $l$ and the same fixed distance $p$ called period, where period of two sub-windows is defined as the distance of two points at the same position within the sub-windows.
This class calculates a statistical profile $S$ over all $S_i$, i.e. the value of $S$ at position $i$ is calculated by performing a certain calculation, e.g. an average, over all values of $S_i$ at position $i$.
Note
The oldest window or the window that is currently overwritten in the
MovingWindow
is not considered in the profile.
Note
When constructing a PeriodicFeatureExtractor
object the
MovingWindow
size has to be a integer multiple of the period.
Example
from frequenz.sdk import microgrid
from datetime import datetime, timedelta, timezone
async with MovingWindow(
size=timedelta(days=35),
resampled_data_recv=microgrid.grid().power.new_receiver(),
input_sampling_period=timedelta(seconds=1),
) as moving_window:
feature_extractor = PeriodicFeatureExtractor(
moving_window=moving_window,
period=timedelta(days=7),
)
now = datetime.now(timezone.utc)
# create a daily weighted average for the next 24h
avg_24h = feature_extractor.avg(
now,
now + timedelta(hours=24),
weights=[0.1, 0.2, 0.3, 0.4]
)
# create a daily average for Thursday March 23 2023
th_avg_24h = feature_extractor.avg(datetime(2023, 3, 23), datetime(2023, 3, 24))
Source code in src/frequenz/sdk/timeseries/_periodic_feature_extractor.py
51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 |
|
Functions¤
__init__ ¤
__init__(
moving_window: MovingWindow, period: timedelta
) -> None
Initialize a PeriodicFeatureExtractor object.
PARAMETER | DESCRIPTION |
---|---|
moving_window
|
The MovingWindow that is used for the average calculation.
TYPE:
|
period
|
The distance between two succeeding intervals.
TYPE:
|
RAISES | DESCRIPTION |
---|---|
ValueError
|
If the MovingWindow size is not a integer multiple of the period. |
Source code in src/frequenz/sdk/timeseries/_periodic_feature_extractor.py
avg ¤
Create the average window out of the window defined by start
and end
.
This method calculates the average of a window by averaging over all
windows fully inside the MovingWindow having the period
self.period
.
PARAMETER | DESCRIPTION |
---|---|
start
|
The start of the window to average over.
TYPE:
|
end
|
The end of the window to average over.
TYPE:
|
weights
|
The weights to use for the average calculation (oldest first). |
RETURNS | DESCRIPTION |
---|---|
NDArray[float64]
|
The averaged timeseries window. |
Source code in src/frequenz/sdk/timeseries/_periodic_feature_extractor.py
frequenz.sdk.timeseries.ReceiverFetcher ¤
Bases: Generic[T_co]
, Protocol
An interface that just exposes a new_receiver
method.
Source code in src/frequenz/sdk/_internal/_channels.py
frequenz.sdk.timeseries.ResamplerConfig
dataclass
¤
Resampler configuration.
Source code in src/frequenz/sdk/timeseries/_resampling/_config.py
85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 |
|
Attributes¤
align_to
class-attribute
instance-attribute
¤
align_to: datetime | None = UNIX_EPOCH
The time to align the resampling period to.
The resampling period will be aligned to this time, so the first resampled
sample will be at the first multiple of resampling_period
starting from
align_to
. It must be an aware datetime and can be in the future too.
If align_to
is None
, the resampling period will be aligned to the
time the resampler is created.
initial_buffer_len
class-attribute
instance-attribute
¤
initial_buffer_len: int = DEFAULT_BUFFER_LEN_INIT
The initial length of the resampling buffer.
The buffer could grow or shrink depending on the source properties, like sampling rate, to make sure all the requested past sampling periods can be stored.
It must be at least 1 and at most max_buffer_len
.
max_buffer_len
class-attribute
instance-attribute
¤
max_buffer_len: int = DEFAULT_BUFFER_LEN_MAX
The maximum length of the resampling buffer.
Buffers won't be allowed to grow beyond this point even if it would be needed to keep all the requested past sampling periods. An error will be emitted in the logs if the buffer length needs to be truncated to this value.
It must be at bigger than warn_buffer_len
.
max_data_age_in_periods
class-attribute
instance-attribute
¤
max_data_age_in_periods: float = 3.0
The maximum age a sample can have to be considered relevant for resampling.
Expressed in number of periods, where period is the resampling_period
if we are downsampling (resampling period bigger than the input period) or
the input sampling period if we are upsampling (input period bigger than
the resampling period).
It must be bigger than 1.0.
Example
If resampling_period
is 3 seconds, the input sampling period is
1 and max_data_age_in_periods
is 2, then data older than 3*2
= 6 seconds will be discarded when creating a new sample and never
passed to the resampling function.
If resampling_period
is 3 seconds, the input sampling period is
5 and max_data_age_in_periods
is 2, then data older than 5*2
= 10 seconds will be discarded when creating a new sample and never
passed to the resampling function.
resampling_function
class-attribute
instance-attribute
¤
resampling_function: ResamplingFunction = (
lambda samples, _, __: fmean((s[1]) for s in samples)
)
The resampling function.
This function will be applied to the sequence of relevant samples at a given time. The result of the function is what is sent as the resampled value.
resampling_period
instance-attribute
¤
resampling_period: timedelta
The resampling period.
This is the time it passes between resampled data should be calculated.
It must be a positive time span.
warn_buffer_len
class-attribute
instance-attribute
¤
warn_buffer_len: int = DEFAULT_BUFFER_LEN_WARN
The minimum length of the resampling buffer that will emit a warning.
If a buffer grows bigger than this value, it will emit a warning in the logs, so buffers don't grow too big inadvertently.
It must be at least 1 and at most max_buffer_len
.
Functions¤
__post_init__ ¤
Check that config values are valid.
RAISES | DESCRIPTION |
---|---|
ValueError
|
If any value is out of range. |
Source code in src/frequenz/sdk/timeseries/_resampling/_config.py
frequenz.sdk.timeseries.ResamplerConfig2
dataclass
¤
Bases: ResamplerConfig
Resampler configuration.
Source code in src/frequenz/sdk/timeseries/_resampling/_config.py
254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 |
|
Attributes¤
align_to
class-attribute
instance-attribute
¤
Deprecated: Use timer_config.align_to instead.
initial_buffer_len
class-attribute
instance-attribute
¤
initial_buffer_len: int = DEFAULT_BUFFER_LEN_INIT
The initial length of the resampling buffer.
The buffer could grow or shrink depending on the source properties, like sampling rate, to make sure all the requested past sampling periods can be stored.
It must be at least 1 and at most max_buffer_len
.
max_buffer_len
class-attribute
instance-attribute
¤
max_buffer_len: int = DEFAULT_BUFFER_LEN_MAX
The maximum length of the resampling buffer.
Buffers won't be allowed to grow beyond this point even if it would be needed to keep all the requested past sampling periods. An error will be emitted in the logs if the buffer length needs to be truncated to this value.
It must be at bigger than warn_buffer_len
.
max_data_age_in_periods
class-attribute
instance-attribute
¤
max_data_age_in_periods: float = 3.0
The maximum age a sample can have to be considered relevant for resampling.
Expressed in number of periods, where period is the resampling_period
if we are downsampling (resampling period bigger than the input period) or
the input sampling period if we are upsampling (input period bigger than
the resampling period).
It must be bigger than 1.0.
Example
If resampling_period
is 3 seconds, the input sampling period is
1 and max_data_age_in_periods
is 2, then data older than 3*2
= 6 seconds will be discarded when creating a new sample and never
passed to the resampling function.
If resampling_period
is 3 seconds, the input sampling period is
5 and max_data_age_in_periods
is 2, then data older than 5*2
= 10 seconds will be discarded when creating a new sample and never
passed to the resampling function.
resampling_function
class-attribute
instance-attribute
¤
resampling_function: ResamplingFunction2 = (
lambda samples, _, __: fmean((s[1]) for s in samples)
)
The resampling function.
This function will be applied to the sequence of relevant samples at a given time. The result of the function is what is sent as the resampled value.
resampling_period
instance-attribute
¤
resampling_period: timedelta
The resampling period.
This is the time it passes between resampled data should be calculated.
It must be a positive time span.
timer_config
class-attribute
instance-attribute
¤
timer_config: WallClockTimerConfig | None = None
The custom configuration of the wall clock timer used to keep track of time.
If not provided or None
, a configuration will be created by passing the
resampling_period
to
the from_interval()
method.
warn_buffer_len
class-attribute
instance-attribute
¤
warn_buffer_len: int = DEFAULT_BUFFER_LEN_WARN
The minimum length of the resampling buffer that will emit a warning.
If a buffer grows bigger than this value, it will emit a warning in the logs, so buffers don't grow too big inadvertently.
It must be at least 1 and at most max_buffer_len
.
Functions¤
__post_init__ ¤
Check that config values are valid.
RAISES | DESCRIPTION |
---|---|
ValueError
|
If any value is out of range. |
Source code in src/frequenz/sdk/timeseries/_resampling/_config.py
frequenz.sdk.timeseries.ResamplingError ¤
Bases: RuntimeError
An Error occurred while resampling.
This error is a container for errors raised by the underlying sources and or sinks.
Source code in src/frequenz/sdk/timeseries/_resampling/_exceptions.py
Attributes¤
exceptions
instance-attribute
¤
A mapping of timeseries source and the exception encountered.
Note that the error could be raised by the sink, while trying to send a resampled data for this timeseries, the source key is only used to identify the timeseries with the issue, it doesn't necessarily mean that the error was raised by the source. The underlying exception should provide information about what was the actual source of the exception.
Functions¤
__init__ ¤
__init__(
exceptions: dict[Source, Exception | CancelledError],
) -> None
Create an instance.
PARAMETER | DESCRIPTION |
---|---|
exceptions
|
A mapping of timeseries source and the exception encountered while resampling that timeseries. Note that the error could be raised by the sink, while trying to send a resampled data for this timeseries, the source key is only used to identify the timeseries with the issue, it doesn't necessarily mean that the error was raised by the source. The underlying exception should provide information about what was the actual source of the exception.
TYPE:
|
Source code in src/frequenz/sdk/timeseries/_resampling/_exceptions.py
frequenz.sdk.timeseries.ResamplingFunction ¤
Bases: Protocol
Combine multiple samples into a new one.
A resampling function produces a new sample based on a list of pre-existing
samples. It can do "upsampling" when the data rate of the input_samples
period is smaller than the resampling_period
, or "downsampling" if it is
bigger.
In general, a resampling window is the same as the resampling_period
, and
this function might receive input samples from multiple windows in the past to
enable extrapolation, but no samples from the future (so the timestamp of the
new sample that is going to be produced will always be bigger than the biggest
timestamp in the input data).
Source code in src/frequenz/sdk/timeseries/_resampling/_config.py
Functions¤
__call__ ¤
__call__(
input_samples: Sequence[tuple[datetime, float]],
resampler_config: ResamplerConfig,
source_properties: SourceProperties,
) -> float
Call the resampling function.
PARAMETER | DESCRIPTION |
---|---|
input_samples
|
The sequence of pre-existing samples, where the first item is the timestamp of the sample, and the second is the value of the sample. The sequence must be non-empty. |
resampler_config
|
The configuration of the resampler calling this function.
TYPE:
|
source_properties
|
The properties of the source being resampled.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
float
|
The value of new sample produced after the resampling. |
Source code in src/frequenz/sdk/timeseries/_resampling/_config.py
frequenz.sdk.timeseries.ResamplingFunction2 ¤
Bases: Protocol
Combine multiple samples into a new one.
A resampling function produces a new sample based on a list of pre-existing
samples. It can do "upsampling" when the data rate of the input_samples
period is smaller than the resampling_period
, or "downsampling" if it is
bigger.
In general, a resampling window is the same as the resampling_period
, and
this function might receive input samples from multiple windows in the past to
enable extrapolation, but no samples from the future (so the timestamp of the
new sample that is going to be produced will always be bigger than the biggest
timestamp in the input data).
Source code in src/frequenz/sdk/timeseries/_resampling/_config.py
Functions¤
__call__ ¤
__call__(
input_samples: Sequence[tuple[datetime, float]],
resampler_config: ResamplerConfig | ResamplerConfig2,
source_properties: SourceProperties,
) -> float
Call the resampling function.
PARAMETER | DESCRIPTION |
---|---|
input_samples
|
The sequence of pre-existing samples, where the first item is the timestamp of the sample, and the second is the value of the sample. The sequence must be non-empty. |
resampler_config
|
The configuration of the resampler calling this function.
TYPE:
|
source_properties
|
The properties of the source being resampled.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
float
|
The value of new sample produced after the resampling. |
Source code in src/frequenz/sdk/timeseries/_resampling/_config.py
frequenz.sdk.timeseries.Sample
dataclass
¤
Bases: Generic[QuantityT]
A measurement taken at a particular point in time.
The value
could be None
if a component is malfunctioning or data is
lacking for another reason, but a sample still needs to be sent to have a
coherent view on a group of component metrics for a particular timestamp.
Source code in src/frequenz/sdk/timeseries/_base_types.py
frequenz.sdk.timeseries.Sample3Phase
dataclass
¤
Bases: Generic[QuantityT]
A 3-phase measurement made at a particular point in time.
Each of the value
fields could be None
if a component is malfunctioning
or data is lacking for another reason, but a sample still needs to be sent
to have a coherent view on a group of component metrics for a particular
timestamp.
Source code in src/frequenz/sdk/timeseries/_base_types.py
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 |
|
Attributes¤
Functions¤
__iter__ ¤
__iter__() -> Iterator[QuantityT | None]
Return an iterator that yields values from each of the phases.
YIELDS | DESCRIPTION |
---|---|
QuantityT | None
|
Per-phase measurements one-by-one. |
Source code in src/frequenz/sdk/timeseries/_base_types.py
map ¤
Apply the given function on each of the phase values and return the result.
If a phase value is None
, replace it with default
instead.
PARAMETER | DESCRIPTION |
---|---|
function
|
The function to apply on each of the phase values.
TYPE:
|
default
|
The value to apply if a phase value is
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
Self
|
A new instance, with the given function applied on values for each of the phases. |
Source code in src/frequenz/sdk/timeseries/_base_types.py
max ¤
Return the max value among all phases, or default if they are all None
.
PARAMETER | DESCRIPTION |
---|---|
default
|
value to return if all phases are
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
QuantityT | None
|
Max value among all phases, if available, default value otherwise. |
Source code in src/frequenz/sdk/timeseries/_base_types.py
min ¤
Return the min value among all phases, or default if they are all None
.
PARAMETER | DESCRIPTION |
---|---|
default
|
value to return if all phases are
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
QuantityT | None
|
Min value among all phases, if available, default value otherwise. |
Source code in src/frequenz/sdk/timeseries/_base_types.py
frequenz.sdk.timeseries.SourceProperties
dataclass
¤
Properties of a resampling source.
Source code in src/frequenz/sdk/timeseries/_resampling/_base_types.py
Attributes¤
received_samples
class-attribute
instance-attribute
¤
received_samples: int = 0
Total samples received by this source so far.
frequenz.sdk.timeseries.SourceStoppedError ¤
Bases: RuntimeError
A timeseries stopped producing samples.
Source code in src/frequenz/sdk/timeseries/_resampling/_exceptions.py
Attributes¤
source
instance-attribute
¤
The source of the timeseries that stopped producing samples.
Functions¤
__init__ ¤
__init__(source: Source) -> None
Create an instance.
PARAMETER | DESCRIPTION |
---|---|
source
|
The source of the timeseries that stopped producing samples.
TYPE:
|
Source code in src/frequenz/sdk/timeseries/_resampling/_exceptions.py
frequenz.sdk.timeseries.TickInfo
dataclass
¤
Information about a WallClockTimer
tick.
Source code in src/frequenz/sdk/timeseries/_resampling/_wall_clock_timer.py
Attributes¤
expected_tick_time
instance-attribute
¤
expected_tick_time: datetime
The expected time when the timer should have triggered.
latest_sleep_info
property
¤
latest_sleep_info: ClocksInfo | None
The clocks information from the last sleep done to trigger this tick.
If no sleeps were done, this will be None
.
sleep_infos
instance-attribute
¤
sleep_infos: Sequence[ClocksInfo]
The information about every sleep performed to trigger this tick.
If the timer didn't have do to a sleep()
to trigger the tick
(i.e. the timer is catching up because there were big drifts in previous ticks),
this will be empty.
frequenz.sdk.timeseries.WallClockTimer ¤
A timer synchronized with the wall clock.
This timer uses the wall clock to trigger ticks and handles discrepancies between the wall clock and monotonic time. Since sleeping is performed using monotonic time, differences between the two clocks can occur.
When the wall clock progresses slower than monotonic time, it is referred to as
compression (wall clock time appears in the past relative to monotonic time).
Conversely, when the wall clock progresses faster, it is called expansion
(wall clock time appears in the future relative to monotonic time). If these
differences exceed a configured threshold, a warning is emitted. The threshold
is defined by the
wall_clock_drift_tolerance_factor
.
If the difference becomes excessively large, it is treated as a time jump.
Time jumps can occur, for example, when the wall clock is adjusted by NTP after
being out of sync for an extended period. In such cases, the timer resynchronizes
with the wall clock and triggers an immediate tick. The threshold for detecting
time jumps is controlled by the
wall_clock_jump_threshold
.
The timer ensures ticks are aligned to the
align_to
configuration,
even after time jumps.
Additionally, the timer emits warnings if the actual sleep duration deviates
significantly from the requested duration. This can happen due to event loop
blocking or system overload. The tolerance for such deviations is defined by the
async_drift_tolerance
.
To account for these complexities, each tick provides a
TickInfo
object, which includes detailed
information about the clocks and their drift.
Source code in src/frequenz/sdk/timeseries/_resampling/_wall_clock_timer.py
324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 |
|
Attributes¤
interval
property
¤
interval: timedelta
The interval between timer ticks.
Since the wall clock is used, this will be added to the current time to calculate the next tick time.
Danger
In real (monotonic) time, the actual time it passes between ticks could be smaller, bigger, or even negative if the wall clock jumped back in time!
next_tick_time
property
¤
next_tick_time: datetime | None
The wall clock time when the next tick should happen, or None
if it is not running.
Functions¤
__aiter__ ¤
__aiter__() -> Self
Get an async iterator over the received messages.
RETURNS | DESCRIPTION |
---|---|
Self
|
This receiver, as it is already an async iterator. |
__anext__
async
¤
__anext__() -> ReceiverMessageT_co
Await the next message in the async iteration over received messages.
RETURNS | DESCRIPTION |
---|---|
ReceiverMessageT_co
|
The next received message. |
RAISES | DESCRIPTION |
---|---|
StopAsyncIteration
|
If the receiver stopped producing messages. |
ReceiverError
|
If there is some problem with the receiver. |
Source code in frequenz/channels/_receiver.py
__init__ ¤
__init__(
interval: timedelta,
config: WallClockTimerConfig | None = None,
*,
auto_start: bool = True
) -> None
Initialize this timer.
See the class documentation for details.
PARAMETER | DESCRIPTION |
---|---|
interval
|
The time between timer ticks. Must be positive.
TYPE:
|
config
|
The configuration for the timer. If
TYPE:
|
auto_start
|
Whether the timer should start automatically. If
TYPE:
|
RAISES | DESCRIPTION |
---|---|
ValueError
|
If any value is out of range. |
Source code in src/frequenz/sdk/timeseries/_resampling/_wall_clock_timer.py
__repr__ ¤
__repr__() -> str
Return a string representation of this timer.
Source code in src/frequenz/sdk/timeseries/_resampling/_wall_clock_timer.py
close ¤
Close and stop the timer.
Once close
has been called, all subsequent calls to ready()
will immediately
return False and calls to consume()
/ receive()
or any use of the async
iterator interface will raise a
ReceiverStoppedError
.
You can restart the timer with reset()
.
Source code in src/frequenz/sdk/timeseries/_resampling/_wall_clock_timer.py
consume ¤
consume() -> TickInfo
Return the latest tick information once ready()
is complete.
Once the timer has triggered
(ready()
is done), this method
returns the information about the tick that just happened.
RETURNS | DESCRIPTION |
---|---|
TickInfo
|
The information about the tick that just happened. |
RAISES | DESCRIPTION |
---|---|
ReceiverStoppedError
|
If the timer was closed via |
Source code in src/frequenz/sdk/timeseries/_resampling/_wall_clock_timer.py
filter ¤
filter(
filter_function: Callable[
[ReceiverMessageT_co],
TypeGuard[FilteredMessageT_co],
],
) -> Receiver[FilteredMessageT_co]
filter(
filter_function: Callable[[ReceiverMessageT_co], bool],
) -> Receiver[ReceiverMessageT_co]
filter(
filter_function: (
Callable[[ReceiverMessageT_co], bool]
| Callable[
[ReceiverMessageT_co],
TypeGuard[FilteredMessageT_co],
]
),
) -> (
Receiver[ReceiverMessageT_co]
| Receiver[FilteredMessageT_co]
)
Apply a filter function on the messages on a receiver.
Note
You can pass a type guard as the filter function to narrow the type of the messages that pass the filter.
Tip
The returned receiver type won't have all the methods of the original
receiver. If you need to access methods of the original receiver that are
not part of the Receiver
interface you should save a reference to the
original receiver and use that instead.
PARAMETER | DESCRIPTION |
---|---|
filter_function
|
The function to be applied on incoming messages to determine if they should be received.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
Receiver[ReceiverMessageT_co] | Receiver[FilteredMessageT_co]
|
A new receiver that only receives messages that pass the filter. |
Source code in frequenz/channels/_receiver.py
map ¤
map(
mapping_function: Callable[
[ReceiverMessageT_co], MappedMessageT_co
],
) -> Receiver[MappedMessageT_co]
Apply a mapping function on the received message.
Tip
The returned receiver type won't have all the methods of the original
receiver. If you need to access methods of the original receiver that are
not part of the Receiver
interface you should save a reference to the
original receiver and use that instead.
PARAMETER | DESCRIPTION |
---|---|
mapping_function
|
The function to be applied on incoming messages.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
Receiver[MappedMessageT_co]
|
A new receiver that applies the function on the received messages. |
Source code in frequenz/channels/_receiver.py
ready
async
¤
ready() -> bool
Wait until the timer interval
passed.
Once a call to ready()
has finished, the resulting tick information
must be read with a call to consume()
(receive()
or iterated over)
to tell the timer it should wait for the next interval.
The timer will remain ready (this method will return immediately) until it is consumed.
RETURNS | DESCRIPTION |
---|---|
bool
|
Whether the timer was started and it is still running. |
Source code in src/frequenz/sdk/timeseries/_resampling/_wall_clock_timer.py
516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 |
|
receive
async
¤
receive() -> ReceiverMessageT_co
Receive a message.
RETURNS | DESCRIPTION |
---|---|
ReceiverMessageT_co
|
The received message. |
RAISES | DESCRIPTION |
---|---|
ReceiverStoppedError
|
If there is some problem with the receiver. |
ReceiverError
|
If there is some problem with the receiver. |
Source code in frequenz/channels/_receiver.py
reset ¤
Reset the timer to start timing from now (plus an optional alignment).
If the timer was closed, or not started yet, it will be started.
Source code in src/frequenz/sdk/timeseries/_resampling/_wall_clock_timer.py
triggered ¤
triggered(
selected: Selected[Any],
) -> TypeGuard[Selected[ReceiverMessageT_co]]
Check whether this receiver was selected by select()
.
This method is used in conjunction with the
Selected
class to determine which receiver was
selected in select()
iteration.
It also works as a type guard to narrow the type of the
Selected
instance to the type of the receiver.
Please see select()
for an example.
PARAMETER | DESCRIPTION |
---|---|
selected
|
The result of a |
RETURNS | DESCRIPTION |
---|---|
TypeGuard[Selected[ReceiverMessageT_co]]
|
Whether this receiver was selected. |
Source code in frequenz/channels/_receiver.py
frequenz.sdk.timeseries.WallClockTimerConfig
dataclass
¤
Configuration for a wall clock timer.
Source code in src/frequenz/sdk/timeseries/_resampling/_wall_clock_timer.py
25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 |
|
Attributes¤
align_to
class-attribute
instance-attribute
¤
align_to: datetime | None = UNIX_EPOCH
The time to align the timer to.
The first timer tick will occur at the first multiple of the timer's interval after this value.
It must be a timezone aware datetime
or None
. If None
, the timer aligns to the
time it is started.
async_drift_tolerance
class-attribute
instance-attribute
¤
async_drift_tolerance: timedelta | None = None
The maximum allowed difference between the requested and the real sleep time.
The timer will emit a warning if the difference is bigger than this value.
It must be bigger than 0 or None
. If None
, no warnings will ever be emitted.
wall_clock_drift_tolerance_factor
class-attribute
instance-attribute
¤
wall_clock_drift_tolerance_factor: float | None = None
The maximum allowed relative difference between the wall clock and monotonic time.
The timer will emit a warning if the relative difference is bigger than this value. If the difference remains constant, the warning will be emitted only once, as the previous drift is taken into account. If there is information on the previous drift, the previous and current factor will be used to determine if a warning should be emitted.
It must be bigger than 0 or None
. If None
, no warnings will be ever emitted.
Info
The calculation is as follows:
tolerance = wall_clock_drift_tolerance_factor
factor = monotonic_elapsed / wall_clock_elapsed
previous_factor = previous_monotonic_elapsed / previous_wall_clock_elapsed
if abs(factor - previous_factor) > tolerance:
emit warning
If there is no previous information, a previous_factor
of 1.0 will be used.
wall_clock_jump_threshold
class-attribute
instance-attribute
¤
wall_clock_jump_threshold: timedelta | None = None
The amount of time that's considered a wall clock jump.
When the drift between the wall clock and monotonic time is too big, it is considered a time jump and the timer will be resynced to the wall clock.
This value determines how big the difference needs to be to be considered a jump.
Smaller values are considered wall clock expansions or compressions and are always gradually adjusted, instead of triggering a resync.
Must be bigger than 0 or None
. If None
, a resync will never be triggered due to
time jumps.
Functions¤
__post_init__ ¤
Check that config values are valid.
RAISES | DESCRIPTION |
---|---|
ValueError
|
If any value is out of range. |
Source code in src/frequenz/sdk/timeseries/_resampling/_wall_clock_timer.py
from_interval
classmethod
¤
from_interval(
interval: timedelta,
*,
align_to: datetime | None = UNIX_EPOCH,
async_drift_tolerance_factor: float = 0.1,
wall_clock_drift_tolerance_factor: float = 0.1,
wall_clock_jump_threshold_factor: float = 1.0
) -> Self
Create a timer configuration based on an interval.
This will set the tolerance and threshold values proportionally to the interval.
PARAMETER | DESCRIPTION |
---|---|
interval
|
The interval between timer ticks. Must be bigger than 0.
TYPE:
|
align_to
|
The time to align the timer to. See the
TYPE:
|
async_drift_tolerance_factor
|
The maximum allowed difference between the
requested and the real sleep time, relative to the interval.
TYPE:
|
wall_clock_drift_tolerance_factor
|
The maximum allowed relative difference
between the wall clock and monotonic time. See the
TYPE:
|
wall_clock_jump_threshold_factor
|
The amount of time that's considered a
wall clock jump, relative to the interval. This will be set to
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
Self
|
The created timer configuration. |
RAISES | DESCRIPTION |
---|---|
ValueError
|
If any value is out of range. |