Skip to content

notification_service

frequenz.lib.notebooks.notification_service ¤

This module provides a notification service for sending alert notifications.

The service supports sending email with optional attachments. It also provides a scheduler for sending periodic notifications with configurable intervals and durations. The service is designed to handle retries and backoff for failed notification attempts.

Example usage¤

Configuration for email notification¤

email_config = EmailConfig( subject="Critical Alert", message="Inverter is in error mode", recipients=["recipient@example.com"], smtp_server="smtp.example.com", smtp_port=587, smtp_user="user@example.com", smtp_password="password", from_email="alert@example.com", attachments=["alert_records.csv"] scheduler=SchedulerConfig( send_immediately=True, interval=60, duration=3600, ), )

email_config_dict = { "subject": "Critical Alert", "message": "Inverter is in error mode", "recipients": ["recipient@example.com"], "smtp_server": "smtp.example.com", "smtp_port": 587, "smtp_user": "user@example.com", "smtp_password": "password", "from_email": "alert@example.com", "attachments": ["alert_records.csv"], "scheduler": { "send_immediately": True, "interval": 60, "duration": 3600, }, }

Create notification objects¤

email_notification = EmailNotification(config=email_config) email_notification_2 = EmailConfig.from_dict(email_config_dict)

Send one-off notification¤

email_notification.send()

Start periodic notifications¤

email_notification.start_scheduler()

Stop the scheduler after some time if needed¤

time.sleep(300) email_notification.stop_scheduler()

Classes¤

frequenz.lib.notebooks.notification_service.BaseNotification ¤

Base class for all notification types.

Subclasses must implement the send method.

Source code in frequenz/lib/notebooks/notification_service.py
class BaseNotification:
    """Base class for all notification types.

    Subclasses must implement the `send` method.
    """

    def __init__(self) -> None:
        """Initialise the notification object."""
        self._scheduler: Scheduler | None = None

    @staticmethod
    def send_with_retry(
        *,
        send_func: Callable[..., None],
        retries: int,
        backoff_factor: int,
        max_sleep: int,
        **kwargs: Any,
    ) -> None:
        """Attempt to execute the `send_func` with retries and backoff.

        Args:
            send_func: The function to execute (e.g., send_email_alert).
            retries: Number of retry attempts after the first failure.
            backoff_factor: Delay factor for (linear) backoff calculation.
            max_sleep: Maximum sleep time in seconds.
            **kwargs: Keyword arguments for the send_func.
        """
        for attempt in range(retries + 1):
            try:
                send_func(**kwargs)
                _log.info("Successfully sent notification on attempt %d", attempt + 1)
                return
            except Exception as e:  # pylint: disable=broad-except
                _log.error("Attempt %d failed: %s", attempt + 1, e)
                if attempt < retries - 1:
                    linear_backoff = backoff_factor * (attempt + 1)
                    time.sleep(min(max_sleep, linear_backoff))
        _log.error("Failed to send notification after %d retries", retries)

    def start_scheduler(self) -> None:
        """Start the scheduler if configured."""
        if self._scheduler:
            _log.info("Starting scheduler for %s", self.__class__.__name__)
            self._scheduler.start(self.send)
        else:
            _log.warning("No scheduler config provided. Cannot start scheduler.")

    def stop_scheduler(self) -> None:
        """Stop the running scheduler."""
        if not self._scheduler:
            _log.warning("No active scheduler to stop.")
            return
        _log.info("Stopping scheduler for notification: %s", self.__class__.__name__)
        self._scheduler.stop()

    @abstractmethod
    def send(self) -> None:
        """Send the notification. To be implemented by subclasses.

        Raises:
            NotImplementedError: If the method is not implemented by the subclass.
        """
        raise NotImplementedError("Subclasses must implement the send method.")
Functions¤
__init__ ¤
__init__() -> None

Initialise the notification object.

Source code in frequenz/lib/notebooks/notification_service.py
def __init__(self) -> None:
    """Initialise the notification object."""
    self._scheduler: Scheduler | None = None
send abstractmethod ¤
send() -> None

Send the notification. To be implemented by subclasses.

RAISES DESCRIPTION
NotImplementedError

If the method is not implemented by the subclass.

Source code in frequenz/lib/notebooks/notification_service.py
@abstractmethod
def send(self) -> None:
    """Send the notification. To be implemented by subclasses.

    Raises:
        NotImplementedError: If the method is not implemented by the subclass.
    """
    raise NotImplementedError("Subclasses must implement the send method.")
send_with_retry staticmethod ¤
send_with_retry(
    *,
    send_func: Callable[..., None],
    retries: int,
    backoff_factor: int,
    max_sleep: int,
    **kwargs: Any
) -> None

Attempt to execute the send_func with retries and backoff.

PARAMETER DESCRIPTION
send_func

The function to execute (e.g., send_email_alert).

TYPE: Callable[..., None]

retries

Number of retry attempts after the first failure.

TYPE: int

backoff_factor

Delay factor for (linear) backoff calculation.

TYPE: int

max_sleep

Maximum sleep time in seconds.

TYPE: int

**kwargs

Keyword arguments for the send_func.

TYPE: Any DEFAULT: {}

Source code in frequenz/lib/notebooks/notification_service.py
@staticmethod
def send_with_retry(
    *,
    send_func: Callable[..., None],
    retries: int,
    backoff_factor: int,
    max_sleep: int,
    **kwargs: Any,
) -> None:
    """Attempt to execute the `send_func` with retries and backoff.

    Args:
        send_func: The function to execute (e.g., send_email_alert).
        retries: Number of retry attempts after the first failure.
        backoff_factor: Delay factor for (linear) backoff calculation.
        max_sleep: Maximum sleep time in seconds.
        **kwargs: Keyword arguments for the send_func.
    """
    for attempt in range(retries + 1):
        try:
            send_func(**kwargs)
            _log.info("Successfully sent notification on attempt %d", attempt + 1)
            return
        except Exception as e:  # pylint: disable=broad-except
            _log.error("Attempt %d failed: %s", attempt + 1, e)
            if attempt < retries - 1:
                linear_backoff = backoff_factor * (attempt + 1)
                time.sleep(min(max_sleep, linear_backoff))
    _log.error("Failed to send notification after %d retries", retries)
start_scheduler ¤
start_scheduler() -> None

Start the scheduler if configured.

Source code in frequenz/lib/notebooks/notification_service.py
def start_scheduler(self) -> None:
    """Start the scheduler if configured."""
    if self._scheduler:
        _log.info("Starting scheduler for %s", self.__class__.__name__)
        self._scheduler.start(self.send)
    else:
        _log.warning("No scheduler config provided. Cannot start scheduler.")
stop_scheduler ¤
stop_scheduler() -> None

Stop the running scheduler.

Source code in frequenz/lib/notebooks/notification_service.py
def stop_scheduler(self) -> None:
    """Stop the running scheduler."""
    if not self._scheduler:
        _log.warning("No active scheduler to stop.")
        return
    _log.info("Stopping scheduler for notification: %s", self.__class__.__name__)
    self._scheduler.stop()

frequenz.lib.notebooks.notification_service.BaseNotificationConfig dataclass ¤

Bases: FromDictMixin

Base configuration for notifications.

Source code in frequenz/lib/notebooks/notification_service.py
@dataclass
class BaseNotificationConfig(FromDictMixin):
    """Base configuration for notifications."""

    subject: str = field(
        metadata={
            "description": "Subject or title of the notification",
            "required": True,
        },
    )

    message: str = field(
        metadata={
            "description": "Message content of the notification",
            "required": True,
        },
    )

    retries: int = field(
        default=3,
        metadata={
            "description": "Number of retry attempts after the first failure",
            "validate": lambda x: 1 < x <= 10,
        },
    )

    backoff_factor: int = field(
        default=3,
        metadata={
            "description": "Delay factor for backoff calculation",
            "validate": lambda x: x > 0,
        },
    )

    max_retry_sleep: int = field(
        default=30,
        metadata={
            "description": (
                "Maximum sleep time between retries in seconds unless a scheduler "
                "is used in which case it is capped at the minimum of the interval "
                "and this value"
            ),
            "validate": lambda x: 0 < x <= 60,
        },
    )

    attachments: list[str] | None = field(
        default=None,
        metadata={
            "description": "List of files to attach to the notification",
        },
    )

    scheduler: SchedulerConfig | None = field(
        default=None,
        metadata={
            "description": "Configuration for the scheduler",
        },
    )
Functions¤
from_dict classmethod ¤
from_dict(data: dict[str, Any]) -> DataclassT

Create an instance of the dataclass from a dictionary.

This method handles
  • Standard fields: Assigns values directly.
  • Nested dataclasses (that also inherit FromDictMixin): Recursively converts dictionaries into dataclass instances.
  • Optional fields with union types: Extracts the dataclass type from the union if present and handles None values.
  • Type validation: Ensures the provided data matches expected field types.
PARAMETER DESCRIPTION
data

The data dictionary to be mapped to the dataclass.

TYPE: dict[str, Any]

RETURNS DESCRIPTION
DataclassT

An instance of the dataclass.

RAISES DESCRIPTION
TypeError

If the input data is not a dictionary or cls is not a dataclass.

Source code in frequenz/lib/notebooks/notification_service.py
@classmethod
def from_dict(cls: type[DataclassT], data: dict[str, Any]) -> DataclassT:
    """Create an instance of the dataclass from a dictionary.

    This method handles:
        - Standard fields: Assigns values directly.
        - Nested dataclasses (that also inherit FromDictMixin): Recursively
            converts dictionaries into dataclass instances.
        - Optional fields with union types: Extracts the dataclass type from
            the union if present and handles None values.
        - Type validation: Ensures the provided data matches expected field
            types.

    Args:
        data: The data dictionary to be mapped to the dataclass.

    Returns:
        An instance of the dataclass.

    Raises:
        TypeError: If the input data is not a dictionary or `cls` is not a
            dataclass.
    """

    def is_union(t: Any) -> bool:
        """Check if a type is a Union."""
        return isinstance(t, UnionType) or get_origin(t) is Union

    if not isinstance(data, dict):
        raise TypeError(
            f"Expected a dictionary to create {cls.__name__}, got {type(data)}."
        )

    if not is_dataclass(cls):
        raise TypeError(f"{cls.__name__} is not a dataclass.")

    field_types = {f.name: f.type for f in fields(cls)}
    init_kwargs = {}

    for key, value in data.items():
        if key not in field_types:
            continue
        field_type = field_types[key]

        # handle union types (e.g., SchedulerConfig | None or Union[SchedulerConfig, None])
        if is_union(field_type):
            if value is None:
                init_kwargs[key] = None
                continue

            # find a dataclass type if one exists
            field_type_args = get_args(field_type)
            for arg in field_type_args:
                if (
                    arg is not type(None)
                    and is_dataclass(arg)
                    and issubclass(arg, FromDictMixin)
                ):
                    field_type = arg
                    break

        # if field is a nested dataclass implementing FromDictMixin and the value is a dict
        if (
            is_dataclass(field_type)
            and isinstance(value, dict)
            and issubclass(field_type, FromDictMixin)
        ):
            init_kwargs[key] = field_type.from_dict(value)
        else:
            init_kwargs[key] = value

    instance = cls(**init_kwargs)
    return instance

frequenz.lib.notebooks.notification_service.EmailConfig dataclass ¤

Bases: BaseNotificationConfig

Configuration for sending email notifications.

Source code in frequenz/lib/notebooks/notification_service.py
@dataclass(kw_only=True)
class EmailConfig(BaseNotificationConfig):
    """Configuration for sending email notifications."""

    smtp_server: str = field(
        metadata={
            "description": "SMTP server address",
            "required": True,
        },
    )

    smtp_port: int = field(
        metadata={
            "description": "SMTP server port",
            "required": True,
        },
    )

    smtp_user: str = field(
        metadata={
            "description": "SMTP server username",
            "required": True,
        },
    )

    smtp_password: str = field(
        metadata={
            "description": "SMTP server password",
            "required": True,
        },
    )

    from_email: str = field(
        metadata={
            "description": "Email address of the sender",
            "required": True,
        },
    )

    recipients: list[str] = field(
        metadata={
            "description": "List of email addresses as recipients",
            "required": True,
        },
    )

    def __post_init__(self) -> None:
        """Validate required fields that must not be empty."""
        if not self.smtp_server:
            raise ValueError("smtp_server is required and cannot be empty.")
        if not self.smtp_port:
            raise ValueError("smtp_port is required and cannot be empty.")
        if not self.smtp_user:
            raise ValueError("smtp_user is required and cannot be empty.")
        if not self.smtp_password:
            raise ValueError("smtp_password is required and cannot be empty.")
        if not self.from_email:
            raise ValueError("from_email is required and cannot be empty.")
        if not self.recipients:
            raise ValueError("recipients is required and cannot be empty.")
Functions¤
__post_init__ ¤
__post_init__() -> None

Validate required fields that must not be empty.

Source code in frequenz/lib/notebooks/notification_service.py
def __post_init__(self) -> None:
    """Validate required fields that must not be empty."""
    if not self.smtp_server:
        raise ValueError("smtp_server is required and cannot be empty.")
    if not self.smtp_port:
        raise ValueError("smtp_port is required and cannot be empty.")
    if not self.smtp_user:
        raise ValueError("smtp_user is required and cannot be empty.")
    if not self.smtp_password:
        raise ValueError("smtp_password is required and cannot be empty.")
    if not self.from_email:
        raise ValueError("from_email is required and cannot be empty.")
    if not self.recipients:
        raise ValueError("recipients is required and cannot be empty.")
from_dict classmethod ¤
from_dict(data: dict[str, Any]) -> DataclassT

Create an instance of the dataclass from a dictionary.

This method handles
  • Standard fields: Assigns values directly.
  • Nested dataclasses (that also inherit FromDictMixin): Recursively converts dictionaries into dataclass instances.
  • Optional fields with union types: Extracts the dataclass type from the union if present and handles None values.
  • Type validation: Ensures the provided data matches expected field types.
PARAMETER DESCRIPTION
data

The data dictionary to be mapped to the dataclass.

TYPE: dict[str, Any]

RETURNS DESCRIPTION
DataclassT

An instance of the dataclass.

RAISES DESCRIPTION
TypeError

If the input data is not a dictionary or cls is not a dataclass.

Source code in frequenz/lib/notebooks/notification_service.py
@classmethod
def from_dict(cls: type[DataclassT], data: dict[str, Any]) -> DataclassT:
    """Create an instance of the dataclass from a dictionary.

    This method handles:
        - Standard fields: Assigns values directly.
        - Nested dataclasses (that also inherit FromDictMixin): Recursively
            converts dictionaries into dataclass instances.
        - Optional fields with union types: Extracts the dataclass type from
            the union if present and handles None values.
        - Type validation: Ensures the provided data matches expected field
            types.

    Args:
        data: The data dictionary to be mapped to the dataclass.

    Returns:
        An instance of the dataclass.

    Raises:
        TypeError: If the input data is not a dictionary or `cls` is not a
            dataclass.
    """

    def is_union(t: Any) -> bool:
        """Check if a type is a Union."""
        return isinstance(t, UnionType) or get_origin(t) is Union

    if not isinstance(data, dict):
        raise TypeError(
            f"Expected a dictionary to create {cls.__name__}, got {type(data)}."
        )

    if not is_dataclass(cls):
        raise TypeError(f"{cls.__name__} is not a dataclass.")

    field_types = {f.name: f.type for f in fields(cls)}
    init_kwargs = {}

    for key, value in data.items():
        if key not in field_types:
            continue
        field_type = field_types[key]

        # handle union types (e.g., SchedulerConfig | None or Union[SchedulerConfig, None])
        if is_union(field_type):
            if value is None:
                init_kwargs[key] = None
                continue

            # find a dataclass type if one exists
            field_type_args = get_args(field_type)
            for arg in field_type_args:
                if (
                    arg is not type(None)
                    and is_dataclass(arg)
                    and issubclass(arg, FromDictMixin)
                ):
                    field_type = arg
                    break

        # if field is a nested dataclass implementing FromDictMixin and the value is a dict
        if (
            is_dataclass(field_type)
            and isinstance(value, dict)
            and issubclass(field_type, FromDictMixin)
        ):
            init_kwargs[key] = field_type.from_dict(value)
        else:
            init_kwargs[key] = value

    instance = cls(**init_kwargs)
    return instance

frequenz.lib.notebooks.notification_service.EmailNotification ¤

Bases: BaseNotification

Handles email notifications.

This class sends HTML emails with optional attachments. It uses the smtplib library to connect to an SMTP server and send the email.

Source code in frequenz/lib/notebooks/notification_service.py
class EmailNotification(BaseNotification):
    """Handles email notifications.

    This class sends HTML emails with optional attachments. It uses the smtplib
    library to connect to an SMTP server and send the email.
    """

    def __init__(self, config: EmailConfig) -> None:
        """Initialise the email notification with configuration.

        Args:
            config: Configuration for email notifications.
        """
        super().__init__()
        self._config: EmailConfig = config

    def send(self) -> None:
        """Send the email notification."""
        self.send_with_retry(
            send_func=self._send_email,
            retries=self._config.retries,
            backoff_factor=self._config.backoff_factor,
            max_sleep=(
                self._config.scheduler.interval
                if self._config.scheduler
                else self._config.max_retry_sleep
            ),
            subject=self._config.subject,
            html_body=self._config.message,
            to_emails=self._config.recipients,
            from_email=self._config.from_email,
            smtp_server=self._config.smtp_server,
            smtp_port=self._config.smtp_port,
            smtp_user=self._config.smtp_user,
            smtp_password=self._config.smtp_password,
            attachments=self._config.attachments,
        )

    @staticmethod
    def _send_email(  # pylint: disable=too-many-arguments
        *,
        subject: str,
        html_body: str,
        to_emails: list[str],
        from_email: str,
        smtp_server: str,
        smtp_port: int,
        smtp_user: str,
        smtp_password: str,
        attachments: list[str] | None = None,
    ) -> None:
        """Send an HTML email alert with optional attachments.

        Args:
            subject: Email subject.
            html_body: HTML body content for the email.
            to_emails: List of recipient email addresses.
            from_email: Sender email address.
            smtp_server: SMTP server address.
            smtp_port: SMTP server port.
            smtp_user: SMTP login username.
            smtp_password: SMTP login password.
            attachments: List of files to attach.

        Raises:
            SMTPException: If the email fails to send.
        """
        msg = EmailMessage()
        msg["From"] = from_email
        msg["To"] = ", ".join(to_emails)
        msg["Subject"] = subject
        msg.add_alternative(html_body, subtype="html")

        if attachments:
            for file in attachments:
                try:
                    with open(file, "rb") as f:
                        msg.add_attachment(
                            f.read(),
                            subtype="octet-stream",
                            filename=os.path.basename(file),
                        )
                except OSError as e:
                    _log.error("Failed to attach file %s: %s", file, e)

        try:
            with smtplib.SMTP(smtp_server, smtp_port) as server:
                server.starttls()
                server.login(smtp_user, smtp_password)
                server.send_message(msg)
            _log.info("Email sent successfully to %s", to_emails)
        except SMTPException as e:
            _log.error("Failed to send email: %s", e)
            raise
Functions¤
__init__ ¤
__init__(config: EmailConfig) -> None

Initialise the email notification with configuration.

PARAMETER DESCRIPTION
config

Configuration for email notifications.

TYPE: EmailConfig

Source code in frequenz/lib/notebooks/notification_service.py
def __init__(self, config: EmailConfig) -> None:
    """Initialise the email notification with configuration.

    Args:
        config: Configuration for email notifications.
    """
    super().__init__()
    self._config: EmailConfig = config
send ¤
send() -> None

Send the email notification.

Source code in frequenz/lib/notebooks/notification_service.py
def send(self) -> None:
    """Send the email notification."""
    self.send_with_retry(
        send_func=self._send_email,
        retries=self._config.retries,
        backoff_factor=self._config.backoff_factor,
        max_sleep=(
            self._config.scheduler.interval
            if self._config.scheduler
            else self._config.max_retry_sleep
        ),
        subject=self._config.subject,
        html_body=self._config.message,
        to_emails=self._config.recipients,
        from_email=self._config.from_email,
        smtp_server=self._config.smtp_server,
        smtp_port=self._config.smtp_port,
        smtp_user=self._config.smtp_user,
        smtp_password=self._config.smtp_password,
        attachments=self._config.attachments,
    )
send_with_retry staticmethod ¤
send_with_retry(
    *,
    send_func: Callable[..., None],
    retries: int,
    backoff_factor: int,
    max_sleep: int,
    **kwargs: Any
) -> None

Attempt to execute the send_func with retries and backoff.

PARAMETER DESCRIPTION
send_func

The function to execute (e.g., send_email_alert).

TYPE: Callable[..., None]

retries

Number of retry attempts after the first failure.

TYPE: int

backoff_factor

Delay factor for (linear) backoff calculation.

TYPE: int

max_sleep

Maximum sleep time in seconds.

TYPE: int

**kwargs

Keyword arguments for the send_func.

TYPE: Any DEFAULT: {}

Source code in frequenz/lib/notebooks/notification_service.py
@staticmethod
def send_with_retry(
    *,
    send_func: Callable[..., None],
    retries: int,
    backoff_factor: int,
    max_sleep: int,
    **kwargs: Any,
) -> None:
    """Attempt to execute the `send_func` with retries and backoff.

    Args:
        send_func: The function to execute (e.g., send_email_alert).
        retries: Number of retry attempts after the first failure.
        backoff_factor: Delay factor for (linear) backoff calculation.
        max_sleep: Maximum sleep time in seconds.
        **kwargs: Keyword arguments for the send_func.
    """
    for attempt in range(retries + 1):
        try:
            send_func(**kwargs)
            _log.info("Successfully sent notification on attempt %d", attempt + 1)
            return
        except Exception as e:  # pylint: disable=broad-except
            _log.error("Attempt %d failed: %s", attempt + 1, e)
            if attempt < retries - 1:
                linear_backoff = backoff_factor * (attempt + 1)
                time.sleep(min(max_sleep, linear_backoff))
    _log.error("Failed to send notification after %d retries", retries)
start_scheduler ¤
start_scheduler() -> None

Start the scheduler if configured.

Source code in frequenz/lib/notebooks/notification_service.py
def start_scheduler(self) -> None:
    """Start the scheduler if configured."""
    if self._scheduler:
        _log.info("Starting scheduler for %s", self.__class__.__name__)
        self._scheduler.start(self.send)
    else:
        _log.warning("No scheduler config provided. Cannot start scheduler.")
stop_scheduler ¤
stop_scheduler() -> None

Stop the running scheduler.

Source code in frequenz/lib/notebooks/notification_service.py
def stop_scheduler(self) -> None:
    """Stop the running scheduler."""
    if not self._scheduler:
        _log.warning("No active scheduler to stop.")
        return
    _log.info("Stopping scheduler for notification: %s", self.__class__.__name__)
    self._scheduler.stop()

frequenz.lib.notebooks.notification_service.FromDictMixin ¤

A mixin to add a from_dict class method for dataclasses.

Source code in frequenz/lib/notebooks/notification_service.py
class FromDictMixin:
    """A mixin to add a from_dict class method for dataclasses."""

    @classmethod
    def from_dict(cls: type[DataclassT], data: dict[str, Any]) -> DataclassT:
        """Create an instance of the dataclass from a dictionary.

        This method handles:
            - Standard fields: Assigns values directly.
            - Nested dataclasses (that also inherit FromDictMixin): Recursively
                converts dictionaries into dataclass instances.
            - Optional fields with union types: Extracts the dataclass type from
                the union if present and handles None values.
            - Type validation: Ensures the provided data matches expected field
                types.

        Args:
            data: The data dictionary to be mapped to the dataclass.

        Returns:
            An instance of the dataclass.

        Raises:
            TypeError: If the input data is not a dictionary or `cls` is not a
                dataclass.
        """

        def is_union(t: Any) -> bool:
            """Check if a type is a Union."""
            return isinstance(t, UnionType) or get_origin(t) is Union

        if not isinstance(data, dict):
            raise TypeError(
                f"Expected a dictionary to create {cls.__name__}, got {type(data)}."
            )

        if not is_dataclass(cls):
            raise TypeError(f"{cls.__name__} is not a dataclass.")

        field_types = {f.name: f.type for f in fields(cls)}
        init_kwargs = {}

        for key, value in data.items():
            if key not in field_types:
                continue
            field_type = field_types[key]

            # handle union types (e.g., SchedulerConfig | None or Union[SchedulerConfig, None])
            if is_union(field_type):
                if value is None:
                    init_kwargs[key] = None
                    continue

                # find a dataclass type if one exists
                field_type_args = get_args(field_type)
                for arg in field_type_args:
                    if (
                        arg is not type(None)
                        and is_dataclass(arg)
                        and issubclass(arg, FromDictMixin)
                    ):
                        field_type = arg
                        break

            # if field is a nested dataclass implementing FromDictMixin and the value is a dict
            if (
                is_dataclass(field_type)
                and isinstance(value, dict)
                and issubclass(field_type, FromDictMixin)
            ):
                init_kwargs[key] = field_type.from_dict(value)
            else:
                init_kwargs[key] = value

        instance = cls(**init_kwargs)
        return instance
Functions¤
from_dict classmethod ¤
from_dict(data: dict[str, Any]) -> DataclassT

Create an instance of the dataclass from a dictionary.

This method handles
  • Standard fields: Assigns values directly.
  • Nested dataclasses (that also inherit FromDictMixin): Recursively converts dictionaries into dataclass instances.
  • Optional fields with union types: Extracts the dataclass type from the union if present and handles None values.
  • Type validation: Ensures the provided data matches expected field types.
PARAMETER DESCRIPTION
data

The data dictionary to be mapped to the dataclass.

TYPE: dict[str, Any]

RETURNS DESCRIPTION
DataclassT

An instance of the dataclass.

RAISES DESCRIPTION
TypeError

If the input data is not a dictionary or cls is not a dataclass.

Source code in frequenz/lib/notebooks/notification_service.py
@classmethod
def from_dict(cls: type[DataclassT], data: dict[str, Any]) -> DataclassT:
    """Create an instance of the dataclass from a dictionary.

    This method handles:
        - Standard fields: Assigns values directly.
        - Nested dataclasses (that also inherit FromDictMixin): Recursively
            converts dictionaries into dataclass instances.
        - Optional fields with union types: Extracts the dataclass type from
            the union if present and handles None values.
        - Type validation: Ensures the provided data matches expected field
            types.

    Args:
        data: The data dictionary to be mapped to the dataclass.

    Returns:
        An instance of the dataclass.

    Raises:
        TypeError: If the input data is not a dictionary or `cls` is not a
            dataclass.
    """

    def is_union(t: Any) -> bool:
        """Check if a type is a Union."""
        return isinstance(t, UnionType) or get_origin(t) is Union

    if not isinstance(data, dict):
        raise TypeError(
            f"Expected a dictionary to create {cls.__name__}, got {type(data)}."
        )

    if not is_dataclass(cls):
        raise TypeError(f"{cls.__name__} is not a dataclass.")

    field_types = {f.name: f.type for f in fields(cls)}
    init_kwargs = {}

    for key, value in data.items():
        if key not in field_types:
            continue
        field_type = field_types[key]

        # handle union types (e.g., SchedulerConfig | None or Union[SchedulerConfig, None])
        if is_union(field_type):
            if value is None:
                init_kwargs[key] = None
                continue

            # find a dataclass type if one exists
            field_type_args = get_args(field_type)
            for arg in field_type_args:
                if (
                    arg is not type(None)
                    and is_dataclass(arg)
                    and issubclass(arg, FromDictMixin)
                ):
                    field_type = arg
                    break

        # if field is a nested dataclass implementing FromDictMixin and the value is a dict
        if (
            is_dataclass(field_type)
            and isinstance(value, dict)
            and issubclass(field_type, FromDictMixin)
        ):
            init_kwargs[key] = field_type.from_dict(value)
        else:
            init_kwargs[key] = value

    instance = cls(**init_kwargs)
    return instance

frequenz.lib.notebooks.notification_service.Scheduler ¤

Utility class for scheduling periodic tasks.

Source code in frequenz/lib/notebooks/notification_service.py
class Scheduler:
    """Utility class for scheduling periodic tasks."""

    def __init__(self, config: SchedulerConfig) -> None:
        """Initialise the scheduler.

        Args:
            config: Configuration for the scheduler.
        """
        self._config = config
        self.task: Callable[..., None] | None = None
        self._task_name: str | None = None
        self._stop_event = threading.Event()
        self._thread: threading.Thread | None = None
        self._time_awoke: float = 0.0  # time when the scheduler awoke from sleep

    def start(self, task: Callable[..., None], **kwargs: Any) -> None:
        """Start the scheduler for a given task.

        Args:
            task: The task to execute periodically.
            **kwargs: Arguments to pass to the task.
        """
        self.task = task
        self._task_name = task.__name__
        _log.info(
            "Starting scheduler for task '%s' to execute every %d seconds and %s",
            self._task_name,
            self._config.interval,
            (
                f"for {self._config.duration} seconds"
                if self._config.duration
                else "indefinitely"
            ),
        )
        self._thread = threading.Thread(
            target=self._run_task, args=(kwargs,), daemon=True
        )
        self._thread.start()

    def stop(self) -> None:
        """Stop the scheduler."""
        if self._thread is not None:
            if self._thread.is_alive():
                _log.info("Stopping scheduler for %s", self._task_name)
                self._stop_event.set()
                if not self._stop_event.is_set():
                    _log.error("Failed to stop scheduler for %s", self._task_name)
        else:
            _log.warning(
                "Attempted to stop scheduler for %s, but no active thread was found.",
                self._task_name,
            )
        _log.info("Scheduler successfully stopped")

    def _run_task(self, kwargs: dict[str, Any]) -> None:
        """Run the scheduled task.

        Args:
            kwargs: Arguments to pass to the task.
        """
        start_time = time.time()
        if self._config.send_immediately:
            self._execute_task(kwargs)
        else:
            _log.info(
                "Waiting for first interval before sending the first notification."
            )
            self._stop_event.wait(self._config.interval)
            self._time_awoke = time.time()

        while not self._stop_event.is_set():
            if self._should_stop(start_time):
                break
            self._execute_task(kwargs)

    def _should_stop(self, start_time: float) -> bool:
        """Determine if the scheduler should stop.

        Args:
            start_time: The time the scheduler started.

        Returns:
            True if the scheduler should stop, False otherwise.
        """
        if (
            self._config.duration is not None
            and (time.time() - self._time_awoke - start_time) >= self._config.duration
        ):
            return True
        return False

    def _execute_task(self, kwargs: dict[str, Any]) -> None:
        """Execute the scheduled task and handle interval waiting.

        Args:
            kwargs: Arguments to pass to the task.
        """
        task_start_time = time.time()
        try:
            if self.task:
                self.task(**kwargs)
        except Exception as e:  # pylint: disable=broad-except
            _log.error(
                "Error occurred during scheduled execution of %s: %s",
                self._task_name,
                e,
            )
        finally:
            task_elapsed = time.time() - task_start_time
            sleep_duration = max(0, self._config.interval - task_elapsed)
            _log.info(
                "Scheduled execution completed for %s. Sleeping for %d seconds.",
                self._task_name,
                sleep_duration,
            )
            self._stop_event.wait(sleep_duration)
            self._time_awoke = time.time()
Functions¤
__init__ ¤
__init__(config: SchedulerConfig) -> None

Initialise the scheduler.

PARAMETER DESCRIPTION
config

Configuration for the scheduler.

TYPE: SchedulerConfig

Source code in frequenz/lib/notebooks/notification_service.py
def __init__(self, config: SchedulerConfig) -> None:
    """Initialise the scheduler.

    Args:
        config: Configuration for the scheduler.
    """
    self._config = config
    self.task: Callable[..., None] | None = None
    self._task_name: str | None = None
    self._stop_event = threading.Event()
    self._thread: threading.Thread | None = None
    self._time_awoke: float = 0.0  # time when the scheduler awoke from sleep
start ¤
start(task: Callable[..., None], **kwargs: Any) -> None

Start the scheduler for a given task.

PARAMETER DESCRIPTION
task

The task to execute periodically.

TYPE: Callable[..., None]

**kwargs

Arguments to pass to the task.

TYPE: Any DEFAULT: {}

Source code in frequenz/lib/notebooks/notification_service.py
def start(self, task: Callable[..., None], **kwargs: Any) -> None:
    """Start the scheduler for a given task.

    Args:
        task: The task to execute periodically.
        **kwargs: Arguments to pass to the task.
    """
    self.task = task
    self._task_name = task.__name__
    _log.info(
        "Starting scheduler for task '%s' to execute every %d seconds and %s",
        self._task_name,
        self._config.interval,
        (
            f"for {self._config.duration} seconds"
            if self._config.duration
            else "indefinitely"
        ),
    )
    self._thread = threading.Thread(
        target=self._run_task, args=(kwargs,), daemon=True
    )
    self._thread.start()
stop ¤
stop() -> None

Stop the scheduler.

Source code in frequenz/lib/notebooks/notification_service.py
def stop(self) -> None:
    """Stop the scheduler."""
    if self._thread is not None:
        if self._thread.is_alive():
            _log.info("Stopping scheduler for %s", self._task_name)
            self._stop_event.set()
            if not self._stop_event.is_set():
                _log.error("Failed to stop scheduler for %s", self._task_name)
    else:
        _log.warning(
            "Attempted to stop scheduler for %s, but no active thread was found.",
            self._task_name,
        )
    _log.info("Scheduler successfully stopped")

frequenz.lib.notebooks.notification_service.SchedulerConfig dataclass ¤

Bases: FromDictMixin

Configuration for the scheduler.

Source code in frequenz/lib/notebooks/notification_service.py
@dataclass
class SchedulerConfig(FromDictMixin):
    """Configuration for the scheduler."""

    send_immediately: bool = field(
        default=False,
        metadata={
            "description": (
                "Whether to send the first notification immediately "
                "upon starting the scheduler or after the first interval"
            ),
        },
    )

    interval: int = field(
        default=60,
        metadata={
            "description": (
                "Frequency in seconds to send the notification if the "
                "scheduler is enabled"
            ),
            "validate": lambda x: x > 0,
        },
    )

    duration: int | None = field(
        default=None,
        metadata={
            "description": (
                "Total duration in seconds to run the scheduler. If None, it runs "
                "indefinitely"
            ),
            "validate": lambda x: x is None or x > 0,
        },
    )
Functions¤
from_dict classmethod ¤
from_dict(data: dict[str, Any]) -> DataclassT

Create an instance of the dataclass from a dictionary.

This method handles
  • Standard fields: Assigns values directly.
  • Nested dataclasses (that also inherit FromDictMixin): Recursively converts dictionaries into dataclass instances.
  • Optional fields with union types: Extracts the dataclass type from the union if present and handles None values.
  • Type validation: Ensures the provided data matches expected field types.
PARAMETER DESCRIPTION
data

The data dictionary to be mapped to the dataclass.

TYPE: dict[str, Any]

RETURNS DESCRIPTION
DataclassT

An instance of the dataclass.

RAISES DESCRIPTION
TypeError

If the input data is not a dictionary or cls is not a dataclass.

Source code in frequenz/lib/notebooks/notification_service.py
@classmethod
def from_dict(cls: type[DataclassT], data: dict[str, Any]) -> DataclassT:
    """Create an instance of the dataclass from a dictionary.

    This method handles:
        - Standard fields: Assigns values directly.
        - Nested dataclasses (that also inherit FromDictMixin): Recursively
            converts dictionaries into dataclass instances.
        - Optional fields with union types: Extracts the dataclass type from
            the union if present and handles None values.
        - Type validation: Ensures the provided data matches expected field
            types.

    Args:
        data: The data dictionary to be mapped to the dataclass.

    Returns:
        An instance of the dataclass.

    Raises:
        TypeError: If the input data is not a dictionary or `cls` is not a
            dataclass.
    """

    def is_union(t: Any) -> bool:
        """Check if a type is a Union."""
        return isinstance(t, UnionType) or get_origin(t) is Union

    if not isinstance(data, dict):
        raise TypeError(
            f"Expected a dictionary to create {cls.__name__}, got {type(data)}."
        )

    if not is_dataclass(cls):
        raise TypeError(f"{cls.__name__} is not a dataclass.")

    field_types = {f.name: f.type for f in fields(cls)}
    init_kwargs = {}

    for key, value in data.items():
        if key not in field_types:
            continue
        field_type = field_types[key]

        # handle union types (e.g., SchedulerConfig | None or Union[SchedulerConfig, None])
        if is_union(field_type):
            if value is None:
                init_kwargs[key] = None
                continue

            # find a dataclass type if one exists
            field_type_args = get_args(field_type)
            for arg in field_type_args:
                if (
                    arg is not type(None)
                    and is_dataclass(arg)
                    and issubclass(arg, FromDictMixin)
                ):
                    field_type = arg
                    break

        # if field is a nested dataclass implementing FromDictMixin and the value is a dict
        if (
            is_dataclass(field_type)
            and isinstance(value, dict)
            and issubclass(field_type, FromDictMixin)
        ):
            init_kwargs[key] = field_type.from_dict(value)
        else:
            init_kwargs[key] = value

    instance = cls(**init_kwargs)
    return instance