Skip to content

Index

frequenz.data.microgrid ¤

Initialize the microgrid data module.

Classes¤

frequenz.data.microgrid.MicrogridConfig dataclass ¤

Configuration of a microgrid.

Source code in frequenz/data/microgrid/config.py
@dataclass
class MicrogridConfig:
    """Configuration of a microgrid."""

    _metadata: Metadata
    """Metadata of the microgrid."""

    _assets_cfg: AssetsConfig
    """Configuration of the assets in the microgrid."""

    _component_types_cfg: dict[str, ComponentTypeConfig]
    """Mapping of component category types to ac power component config."""

    def __init__(self, config_dict: dict[str, Any]) -> None:
        """Initialize the microgrid configuration.

        Args:
            config_dict: Dictionary with component type as key and config as value.
        """
        self._metadata = Metadata(**(config_dict.get("meta") or {}))

        self._assets_cfg = AssetsConfig(
            pv=config_dict.get("pv") or {},
            wind=config_dict.get("wind") or {},
            battery=config_dict.get("battery") or {},
        )

        self._component_types_cfg = {
            ctype: ComponentTypeConfig(component_type=cast(ComponentType, ctype), **cfg)
            for ctype, cfg in config_dict.get("ctype", {}).items()
            if ComponentTypeConfig.is_valid_type(ctype)
        }

    @property
    def meta(self) -> Metadata:
        """Return the metadata of the microgrid."""
        return self._metadata

    @property
    def assets(self) -> AssetsConfig:
        """Return the assets configuration of the microgrid."""
        return self._assets_cfg

    def component_types(self) -> list[str]:
        """Get a list of all component types in the configuration."""
        return list(self._component_types_cfg.keys())

    def component_type_ids(
        self,
        component_type: str,
        component_category: str | None = None,
        metric: str = "",
    ) -> list[int]:
        """Get a list of all component IDs for a component type.

        Args:
            component_type: Component type to be aggregated.
            component_category: Specific category of component IDs to retrieve
                (e.g., "meter", "inverter", or "component"). If not provided,
                the default logic is used.
            metric: Metric name of the formula if CIDs should be extracted from the formula.

        Returns:
            List of component IDs for this component type.

        Raises:
            ValueError: If the component type is unknown.
            KeyError: If `component_category` is invalid.
        """
        cfg = self._component_types_cfg.get(component_type)
        if not cfg:
            raise ValueError(f"{component_type} not found in config.")

        if component_category:
            valid_categories = get_args(ComponentCategory)
            if component_category not in valid_categories:
                raise KeyError(
                    f"Invalid component category: {component_category}. "
                    f"Valid categories are {valid_categories}"
                )
            category_ids = cast(list[int], getattr(cfg, component_category, []))
            return category_ids

        return cfg.cids(metric)

    def formula(self, component_type: str, metric: str) -> str:
        """Get the formula for a component type.

        Args:
            component_type: Component type to be aggregated.
            metric: Metric to be aggregated.

        Returns:
            Formula to be used for this aggregated component as string.

        Raises:
            ValueError: If the component type is unknown or formula is missing.
        """
        cfg = self._component_types_cfg.get(component_type)
        if not cfg:
            raise ValueError(f"{component_type} not found in config.")
        if cfg.formula is None:
            raise ValueError(f"No formula set for {component_type}")
        formula = cfg.formula.get(metric)
        if not formula:
            raise ValueError(f"{component_type} is missing formula for {metric}")

        return formula

    @staticmethod
    def load_configs(
        microgrid_config_files: str | Path | list[str | Path] | None = None,
        microgrid_config_dir: str | Path | None = None,
    ) -> dict[str, "MicrogridConfig"]:
        """Load multiple microgrid configurations from a file.

        Configs for a single microgrid are expected to be in a single file.
        Later files with the same microgrid ID will overwrite the previous configs.

        Args:
            microgrid_config_files: Path to a single microgrid config file or list of paths.
            microgrid_config_dir: Directory containing multiple microgrid config files.

        Returns:
            Dictionary of single microgrid formula configs with microgrid IDs as keys.

        Raises:
            ValueError: If no config files or dir is provided, or if no config files are found.
        """
        if microgrid_config_files is None and microgrid_config_dir is None:
            raise ValueError(
                "No microgrid config path or directory provided. "
                "Please provide at least one."
            )

        config_files: list[Path] = []

        if microgrid_config_files:
            if isinstance(microgrid_config_files, str):
                config_files = [Path(microgrid_config_files)]
            elif isinstance(microgrid_config_files, Path):
                config_files = [microgrid_config_files]
            elif isinstance(microgrid_config_files, list):
                config_files = [Path(f) for f in microgrid_config_files]

        if microgrid_config_dir:
            if Path(microgrid_config_dir).is_dir():
                config_files += list(Path(microgrid_config_dir).glob("*.toml"))
            else:
                raise ValueError(
                    f"Microgrid config directory {microgrid_config_dir} "
                    "is not a directory"
                )

        if len(config_files) == 0:
            raise ValueError(
                "No microgrid config files found. "
                "Please provide at least one valid config file."
            )

        microgrid_configs: dict[str, "MicrogridConfig"] = {}

        for config_path in config_files:
            if not config_path.is_file():
                _logger.warning("Config path %s is not a file, skipping.", config_path)
                continue

            with config_path.open("rb") as f:
                cfg_dict = tomllib.load(f)
                for microgrid_id, mcfg in cfg_dict.items():
                    _logger.debug(
                        "Loading microgrid config for ID %s from %s",
                        microgrid_id,
                        config_path,
                    )
                    microgrid_configs[microgrid_id] = MicrogridConfig(mcfg)

        return microgrid_configs
Attributes¤
assets property ¤
assets: AssetsConfig

Return the assets configuration of the microgrid.

meta property ¤
meta: Metadata

Return the metadata of the microgrid.

Functions¤
__init__ ¤
__init__(config_dict: dict[str, Any]) -> None

Initialize the microgrid configuration.

PARAMETER DESCRIPTION
config_dict

Dictionary with component type as key and config as value.

TYPE: dict[str, Any]

Source code in frequenz/data/microgrid/config.py
def __init__(self, config_dict: dict[str, Any]) -> None:
    """Initialize the microgrid configuration.

    Args:
        config_dict: Dictionary with component type as key and config as value.
    """
    self._metadata = Metadata(**(config_dict.get("meta") or {}))

    self._assets_cfg = AssetsConfig(
        pv=config_dict.get("pv") or {},
        wind=config_dict.get("wind") or {},
        battery=config_dict.get("battery") or {},
    )

    self._component_types_cfg = {
        ctype: ComponentTypeConfig(component_type=cast(ComponentType, ctype), **cfg)
        for ctype, cfg in config_dict.get("ctype", {}).items()
        if ComponentTypeConfig.is_valid_type(ctype)
    }
component_type_ids ¤
component_type_ids(
    component_type: str,
    component_category: str | None = None,
    metric: str = "",
) -> list[int]

Get a list of all component IDs for a component type.

PARAMETER DESCRIPTION
component_type

Component type to be aggregated.

TYPE: str

component_category

Specific category of component IDs to retrieve (e.g., "meter", "inverter", or "component"). If not provided, the default logic is used.

TYPE: str | None DEFAULT: None

metric

Metric name of the formula if CIDs should be extracted from the formula.

TYPE: str DEFAULT: ''

RETURNS DESCRIPTION
list[int]

List of component IDs for this component type.

RAISES DESCRIPTION
ValueError

If the component type is unknown.

KeyError

If component_category is invalid.

Source code in frequenz/data/microgrid/config.py
def component_type_ids(
    self,
    component_type: str,
    component_category: str | None = None,
    metric: str = "",
) -> list[int]:
    """Get a list of all component IDs for a component type.

    Args:
        component_type: Component type to be aggregated.
        component_category: Specific category of component IDs to retrieve
            (e.g., "meter", "inverter", or "component"). If not provided,
            the default logic is used.
        metric: Metric name of the formula if CIDs should be extracted from the formula.

    Returns:
        List of component IDs for this component type.

    Raises:
        ValueError: If the component type is unknown.
        KeyError: If `component_category` is invalid.
    """
    cfg = self._component_types_cfg.get(component_type)
    if not cfg:
        raise ValueError(f"{component_type} not found in config.")

    if component_category:
        valid_categories = get_args(ComponentCategory)
        if component_category not in valid_categories:
            raise KeyError(
                f"Invalid component category: {component_category}. "
                f"Valid categories are {valid_categories}"
            )
        category_ids = cast(list[int], getattr(cfg, component_category, []))
        return category_ids

    return cfg.cids(metric)
component_types ¤
component_types() -> list[str]

Get a list of all component types in the configuration.

Source code in frequenz/data/microgrid/config.py
def component_types(self) -> list[str]:
    """Get a list of all component types in the configuration."""
    return list(self._component_types_cfg.keys())
formula ¤
formula(component_type: str, metric: str) -> str

Get the formula for a component type.

PARAMETER DESCRIPTION
component_type

Component type to be aggregated.

TYPE: str

metric

Metric to be aggregated.

TYPE: str

RETURNS DESCRIPTION
str

Formula to be used for this aggregated component as string.

RAISES DESCRIPTION
ValueError

If the component type is unknown or formula is missing.

Source code in frequenz/data/microgrid/config.py
def formula(self, component_type: str, metric: str) -> str:
    """Get the formula for a component type.

    Args:
        component_type: Component type to be aggregated.
        metric: Metric to be aggregated.

    Returns:
        Formula to be used for this aggregated component as string.

    Raises:
        ValueError: If the component type is unknown or formula is missing.
    """
    cfg = self._component_types_cfg.get(component_type)
    if not cfg:
        raise ValueError(f"{component_type} not found in config.")
    if cfg.formula is None:
        raise ValueError(f"No formula set for {component_type}")
    formula = cfg.formula.get(metric)
    if not formula:
        raise ValueError(f"{component_type} is missing formula for {metric}")

    return formula
load_configs staticmethod ¤
load_configs(
    microgrid_config_files: (
        str | Path | list[str | Path] | None
    ) = None,
    microgrid_config_dir: str | Path | None = None,
) -> dict[str, MicrogridConfig]

Load multiple microgrid configurations from a file.

Configs for a single microgrid are expected to be in a single file. Later files with the same microgrid ID will overwrite the previous configs.

PARAMETER DESCRIPTION
microgrid_config_files

Path to a single microgrid config file or list of paths.

TYPE: str | Path | list[str | Path] | None DEFAULT: None

microgrid_config_dir

Directory containing multiple microgrid config files.

TYPE: str | Path | None DEFAULT: None

RETURNS DESCRIPTION
dict[str, MicrogridConfig]

Dictionary of single microgrid formula configs with microgrid IDs as keys.

RAISES DESCRIPTION
ValueError

If no config files or dir is provided, or if no config files are found.

Source code in frequenz/data/microgrid/config.py
@staticmethod
def load_configs(
    microgrid_config_files: str | Path | list[str | Path] | None = None,
    microgrid_config_dir: str | Path | None = None,
) -> dict[str, "MicrogridConfig"]:
    """Load multiple microgrid configurations from a file.

    Configs for a single microgrid are expected to be in a single file.
    Later files with the same microgrid ID will overwrite the previous configs.

    Args:
        microgrid_config_files: Path to a single microgrid config file or list of paths.
        microgrid_config_dir: Directory containing multiple microgrid config files.

    Returns:
        Dictionary of single microgrid formula configs with microgrid IDs as keys.

    Raises:
        ValueError: If no config files or dir is provided, or if no config files are found.
    """
    if microgrid_config_files is None and microgrid_config_dir is None:
        raise ValueError(
            "No microgrid config path or directory provided. "
            "Please provide at least one."
        )

    config_files: list[Path] = []

    if microgrid_config_files:
        if isinstance(microgrid_config_files, str):
            config_files = [Path(microgrid_config_files)]
        elif isinstance(microgrid_config_files, Path):
            config_files = [microgrid_config_files]
        elif isinstance(microgrid_config_files, list):
            config_files = [Path(f) for f in microgrid_config_files]

    if microgrid_config_dir:
        if Path(microgrid_config_dir).is_dir():
            config_files += list(Path(microgrid_config_dir).glob("*.toml"))
        else:
            raise ValueError(
                f"Microgrid config directory {microgrid_config_dir} "
                "is not a directory"
            )

    if len(config_files) == 0:
        raise ValueError(
            "No microgrid config files found. "
            "Please provide at least one valid config file."
        )

    microgrid_configs: dict[str, "MicrogridConfig"] = {}

    for config_path in config_files:
        if not config_path.is_file():
            _logger.warning("Config path %s is not a file, skipping.", config_path)
            continue

        with config_path.open("rb") as f:
            cfg_dict = tomllib.load(f)
            for microgrid_id, mcfg in cfg_dict.items():
                _logger.debug(
                    "Loading microgrid config for ID %s from %s",
                    microgrid_id,
                    config_path,
                )
                microgrid_configs[microgrid_id] = MicrogridConfig(mcfg)

    return microgrid_configs

frequenz.data.microgrid.MicrogridData ¤

Fetch power data for component types of a microgrid.

Source code in frequenz/data/microgrid/component_data.py
class MicrogridData:
    """Fetch power data for component types of a microgrid."""

    def __init__(
        self,
        server_url: str,
        auth_key: str,
        sign_secret: str,
        microgrid_configs: dict[str, MicrogridConfig] | None = None,
    ) -> None:
        """Initialize microgrid data.

        Args:
            server_url: URL of the reporting service.
            auth_key: Authentication key to the service.
            sign_secret: Secret for signing requests.
            microgrid_configs: MicrogridConfig dict mapping microgrid IDs to MicrogridConfigs.
        """
        self._microgrid_configs = microgrid_configs
        self._client = ReportingApiClient(
            server_url=server_url, auth_key=auth_key, sign_secret=sign_secret
        )

    @property
    def microgrid_ids(self) -> list[str]:
        """Get the microgrid IDs.

        Returns:
            List of microgrid IDs.
        """
        return list(self._microgrid_configs.keys())

    @property
    def microgrid_configs(self) -> dict[str, MicrogridConfig]:
        """Return the microgrid configurations."""
        return self._microgrid_configs

    # pylint: disable=too-many-locals
    async def metric_data(  # pylint: disable=too-many-arguments
        self,
        *,
        microgrid_id: int,
        start: datetime,
        end: datetime,
        component_types: tuple[str, ...] = ("grid", "pv", "battery"),
        resampling_period: timedelta = timedelta(seconds=10),
        metric: str = "AC_ACTIVE_POWER",
        keep_components: bool = False,
        splits: bool = False,
    ) -> pd.DataFrame | None:
        """Power data for component types of a microgrid.

        Args:
            microgrid_id: Microgrid ID.
            start: Start timestamp.
            end: End timestamp.
            component_types: List of component types to be aggregated.
            resampling_period: Data resampling period.
            metric: Metric to be fetched.
            keep_components: Include individual components in output.
            splits: Include columns for positive and negative power values for components.

        Returns:
            DataFrame with power data of aggregated components
            or None if no data is available
        """
        mcfg = self._microgrid_configs[f"{microgrid_id}"]

        formulas = {
            ctype: mcfg.formula(ctype, metric.upper()) for ctype in component_types
        }

        logging.debug("Formulas: %s", formulas)

        metric_enum = Metric[metric.upper()]
        data = [
            sample
            for ctype, formula in formulas.items()
            async for sample in self._client.receive_aggregated_data(
                microgrid_id=microgrid_id,
                metric=metric_enum,
                aggregation_formula=formula,
                start_time=start,
                end_time=end,
                resampling_period=resampling_period,
            )
        ]

        all_cids = []
        if keep_components:
            all_cids = [
                cid
                for ctype in component_types
                for cid in mcfg.component_type_ids(ctype, metric=metric)
            ]
            _logger.debug("CIDs: %s", all_cids)
            microgrid_components = [
                (microgrid_id, all_cids),
            ]
            data_comp = [
                sample
                async for sample in self._client.receive_microgrid_components_data(
                    microgrid_components=microgrid_components,
                    metrics=metric_enum,
                    start_time=start,
                    end_time=end,
                    resampling_period=resampling_period,
                )
            ]
            data.extend(data_comp)

        if len(data) == 0:
            _logger.warning("No data found")
            return None

        df = pd.DataFrame(data)
        df["timestamp"] = pd.to_datetime(df["timestamp"], utc=True)
        assert df["timestamp"].dt.tz is not None, "Timestamps are not tz-aware"

        # Remove duplicates
        dup_mask = df.duplicated(keep="first")
        if not dup_mask.empty:
            _logger.info("Found %s rows that have duplicates", dup_mask.sum())
        df = df[~dup_mask]

        # Pivot table
        df = df.pivot_table(index="timestamp", columns="component_id", values="value")
        # Rename formula columns
        rename_cols: dict[str, str] = {}
        for ctype, formula in formulas.items():
            if formula in rename_cols:
                _logger.warning(
                    "Ignoring %s since formula %s exists already for %s",
                    ctype,
                    formula,
                    rename_cols[formula],
                )
                continue
            rename_cols[formula] = ctype

        df = df.rename(columns=rename_cols)
        if keep_components:
            # Set missing columns to NaN
            for cid in all_cids:
                if cid not in df.columns:
                    _logger.warning(
                        "Component ID %s not found in data, setting zero", cid
                    )
                    df.loc[:, cid] = np.nan

        # Make string columns
        df.columns = [str(e) for e in df.columns]  # type: ignore

        cols = df.columns
        if splits:
            pos_cols = [f"{col}_pos" for col in cols]
            neg_cols = [f"{col}_neg" for col in cols]
            df[pos_cols] = df[cols].clip(lower=0)
            df[neg_cols] = df[cols].clip(upper=0)

        # Sort columns
        ctypes = list(rename_cols.values())
        new_cols = [e for e in ctypes if e in df.columns] + sorted(
            [e for e in df.columns if e not in ctypes]
        )
        df = df[new_cols]

        return df

    async def ac_active_power(  # pylint: disable=too-many-arguments
        self,
        *,
        microgrid_id: int,
        start: datetime,
        end: datetime,
        component_types: tuple[str, ...] = ("grid", "pv", "battery"),
        resampling_period: timedelta = timedelta(seconds=10),
        keep_components: bool = False,
        splits: bool = False,
        unit: str = "kW",
    ) -> pd.DataFrame | None:
        """Power data for component types of a microgrid."""
        df = await self.metric_data(
            microgrid_id=microgrid_id,
            start=start,
            end=end,
            component_types=component_types,
            resampling_period=resampling_period,
            metric="AC_ACTIVE_POWER",
            keep_components=keep_components,
            splits=splits,
        )
        if df is None:
            return df

        if unit == "W":
            pass
        if unit == "kW":
            df = df / 1000
        elif unit == "MW":
            df = df / 1e6
        else:
            raise ValueError(f"Unknown unit: {unit}")
        return df

    async def soc(  # pylint: disable=too-many-arguments
        self,
        *,
        microgrid_id: int,
        start: datetime,
        end: datetime,
        resampling_period: timedelta = timedelta(seconds=10),
        keep_components: bool = False,
    ) -> pd.DataFrame | None:
        """Soc data for component types of a microgrid."""
        df = await self.metric_data(
            microgrid_id=microgrid_id,
            start=start,
            end=end,
            component_types=("battery",),
            resampling_period=resampling_period,
            metric="BATTERY_SOC_PCT",
            keep_components=keep_components,
        )
        return df
Attributes¤
microgrid_configs property ¤
microgrid_configs: dict[str, MicrogridConfig]

Return the microgrid configurations.

microgrid_ids property ¤
microgrid_ids: list[str]

Get the microgrid IDs.

RETURNS DESCRIPTION
list[str]

List of microgrid IDs.

Functions¤
__init__ ¤
__init__(
    server_url: str,
    auth_key: str,
    sign_secret: str,
    microgrid_configs: (
        dict[str, MicrogridConfig] | None
    ) = None,
) -> None

Initialize microgrid data.

PARAMETER DESCRIPTION
server_url

URL of the reporting service.

TYPE: str

auth_key

Authentication key to the service.

TYPE: str

sign_secret

Secret for signing requests.

TYPE: str

microgrid_configs

MicrogridConfig dict mapping microgrid IDs to MicrogridConfigs.

TYPE: dict[str, MicrogridConfig] | None DEFAULT: None

Source code in frequenz/data/microgrid/component_data.py
def __init__(
    self,
    server_url: str,
    auth_key: str,
    sign_secret: str,
    microgrid_configs: dict[str, MicrogridConfig] | None = None,
) -> None:
    """Initialize microgrid data.

    Args:
        server_url: URL of the reporting service.
        auth_key: Authentication key to the service.
        sign_secret: Secret for signing requests.
        microgrid_configs: MicrogridConfig dict mapping microgrid IDs to MicrogridConfigs.
    """
    self._microgrid_configs = microgrid_configs
    self._client = ReportingApiClient(
        server_url=server_url, auth_key=auth_key, sign_secret=sign_secret
    )
ac_active_power async ¤
ac_active_power(
    *,
    microgrid_id: int,
    start: datetime,
    end: datetime,
    component_types: tuple[str, ...] = (
        "grid",
        "pv",
        "battery",
    ),
    resampling_period: timedelta = timedelta(seconds=10),
    keep_components: bool = False,
    splits: bool = False,
    unit: str = "kW"
) -> DataFrame | None

Power data for component types of a microgrid.

Source code in frequenz/data/microgrid/component_data.py
async def ac_active_power(  # pylint: disable=too-many-arguments
    self,
    *,
    microgrid_id: int,
    start: datetime,
    end: datetime,
    component_types: tuple[str, ...] = ("grid", "pv", "battery"),
    resampling_period: timedelta = timedelta(seconds=10),
    keep_components: bool = False,
    splits: bool = False,
    unit: str = "kW",
) -> pd.DataFrame | None:
    """Power data for component types of a microgrid."""
    df = await self.metric_data(
        microgrid_id=microgrid_id,
        start=start,
        end=end,
        component_types=component_types,
        resampling_period=resampling_period,
        metric="AC_ACTIVE_POWER",
        keep_components=keep_components,
        splits=splits,
    )
    if df is None:
        return df

    if unit == "W":
        pass
    if unit == "kW":
        df = df / 1000
    elif unit == "MW":
        df = df / 1e6
    else:
        raise ValueError(f"Unknown unit: {unit}")
    return df
metric_data async ¤
metric_data(
    *,
    microgrid_id: int,
    start: datetime,
    end: datetime,
    component_types: tuple[str, ...] = (
        "grid",
        "pv",
        "battery",
    ),
    resampling_period: timedelta = timedelta(seconds=10),
    metric: str = "AC_ACTIVE_POWER",
    keep_components: bool = False,
    splits: bool = False
) -> DataFrame | None

Power data for component types of a microgrid.

PARAMETER DESCRIPTION
microgrid_id

Microgrid ID.

TYPE: int

start

Start timestamp.

TYPE: datetime

end

End timestamp.

TYPE: datetime

component_types

List of component types to be aggregated.

TYPE: tuple[str, ...] DEFAULT: ('grid', 'pv', 'battery')

resampling_period

Data resampling period.

TYPE: timedelta DEFAULT: timedelta(seconds=10)

metric

Metric to be fetched.

TYPE: str DEFAULT: 'AC_ACTIVE_POWER'

keep_components

Include individual components in output.

TYPE: bool DEFAULT: False

splits

Include columns for positive and negative power values for components.

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
DataFrame | None

DataFrame with power data of aggregated components

DataFrame | None

or None if no data is available

Source code in frequenz/data/microgrid/component_data.py
async def metric_data(  # pylint: disable=too-many-arguments
    self,
    *,
    microgrid_id: int,
    start: datetime,
    end: datetime,
    component_types: tuple[str, ...] = ("grid", "pv", "battery"),
    resampling_period: timedelta = timedelta(seconds=10),
    metric: str = "AC_ACTIVE_POWER",
    keep_components: bool = False,
    splits: bool = False,
) -> pd.DataFrame | None:
    """Power data for component types of a microgrid.

    Args:
        microgrid_id: Microgrid ID.
        start: Start timestamp.
        end: End timestamp.
        component_types: List of component types to be aggregated.
        resampling_period: Data resampling period.
        metric: Metric to be fetched.
        keep_components: Include individual components in output.
        splits: Include columns for positive and negative power values for components.

    Returns:
        DataFrame with power data of aggregated components
        or None if no data is available
    """
    mcfg = self._microgrid_configs[f"{microgrid_id}"]

    formulas = {
        ctype: mcfg.formula(ctype, metric.upper()) for ctype in component_types
    }

    logging.debug("Formulas: %s", formulas)

    metric_enum = Metric[metric.upper()]
    data = [
        sample
        for ctype, formula in formulas.items()
        async for sample in self._client.receive_aggregated_data(
            microgrid_id=microgrid_id,
            metric=metric_enum,
            aggregation_formula=formula,
            start_time=start,
            end_time=end,
            resampling_period=resampling_period,
        )
    ]

    all_cids = []
    if keep_components:
        all_cids = [
            cid
            for ctype in component_types
            for cid in mcfg.component_type_ids(ctype, metric=metric)
        ]
        _logger.debug("CIDs: %s", all_cids)
        microgrid_components = [
            (microgrid_id, all_cids),
        ]
        data_comp = [
            sample
            async for sample in self._client.receive_microgrid_components_data(
                microgrid_components=microgrid_components,
                metrics=metric_enum,
                start_time=start,
                end_time=end,
                resampling_period=resampling_period,
            )
        ]
        data.extend(data_comp)

    if len(data) == 0:
        _logger.warning("No data found")
        return None

    df = pd.DataFrame(data)
    df["timestamp"] = pd.to_datetime(df["timestamp"], utc=True)
    assert df["timestamp"].dt.tz is not None, "Timestamps are not tz-aware"

    # Remove duplicates
    dup_mask = df.duplicated(keep="first")
    if not dup_mask.empty:
        _logger.info("Found %s rows that have duplicates", dup_mask.sum())
    df = df[~dup_mask]

    # Pivot table
    df = df.pivot_table(index="timestamp", columns="component_id", values="value")
    # Rename formula columns
    rename_cols: dict[str, str] = {}
    for ctype, formula in formulas.items():
        if formula in rename_cols:
            _logger.warning(
                "Ignoring %s since formula %s exists already for %s",
                ctype,
                formula,
                rename_cols[formula],
            )
            continue
        rename_cols[formula] = ctype

    df = df.rename(columns=rename_cols)
    if keep_components:
        # Set missing columns to NaN
        for cid in all_cids:
            if cid not in df.columns:
                _logger.warning(
                    "Component ID %s not found in data, setting zero", cid
                )
                df.loc[:, cid] = np.nan

    # Make string columns
    df.columns = [str(e) for e in df.columns]  # type: ignore

    cols = df.columns
    if splits:
        pos_cols = [f"{col}_pos" for col in cols]
        neg_cols = [f"{col}_neg" for col in cols]
        df[pos_cols] = df[cols].clip(lower=0)
        df[neg_cols] = df[cols].clip(upper=0)

    # Sort columns
    ctypes = list(rename_cols.values())
    new_cols = [e for e in ctypes if e in df.columns] + sorted(
        [e for e in df.columns if e not in ctypes]
    )
    df = df[new_cols]

    return df
soc async ¤
soc(
    *,
    microgrid_id: int,
    start: datetime,
    end: datetime,
    resampling_period: timedelta = timedelta(seconds=10),
    keep_components: bool = False
) -> DataFrame | None

Soc data for component types of a microgrid.

Source code in frequenz/data/microgrid/component_data.py
async def soc(  # pylint: disable=too-many-arguments
    self,
    *,
    microgrid_id: int,
    start: datetime,
    end: datetime,
    resampling_period: timedelta = timedelta(seconds=10),
    keep_components: bool = False,
) -> pd.DataFrame | None:
    """Soc data for component types of a microgrid."""
    df = await self.metric_data(
        microgrid_id=microgrid_id,
        start=start,
        end=end,
        component_types=("battery",),
        resampling_period=resampling_period,
        metric="BATTERY_SOC_PCT",
        keep_components=keep_components,
    )
    return df

frequenz.data.microgrid.StatefulDataFetcher ¤

A helper class to handle fetching of microgrid component data.

This class provides methods to query new data for a specific microgrid and its components, and to write the updated data to a temporary file. The file is written only temporarily and needs to be committed or rolled back in case of success or failure, respectively.

Source code in frequenz/data/microgrid/_stateful_data_fetcher.py
class StatefulDataFetcher:
    """A helper class to handle fetching of microgrid component data.

    This class provides methods to query new data for a specific microgrid and
    its components, and to write the updated data to a temporary file.
    The file is written only temporarily and needs to be committed or rolled back
    in case of success or failure, respectively.
    """

    def __init__(  # pylint: disable=too-many-arguments, too-many-positional-arguments
        self,
        microgrid_data: MicrogridData,
        data_buffer_dir: Path,
        resampling_period: timedelta,
        end_time_delta: timedelta = END_TIME_DELTA,
        initial_period: timedelta = timedelta(hours=1),
    ) -> None:
        """Initialize the TransactionalDataExport class.

        Args:
            microgrid_data: An instance of MicrogridData to query data from.
            data_buffer_dir: The path to the directory for buffering data.
            resampling_period: The period to resample the data.
            end_time_delta: The time subtracted from now to be passed as
                            end_time to the API.
            initial_period: The initial period to use for the first data fetch.
        """
        _logger.debug("Initializing StatefulDataFetcher instance.")
        self._microgrid_data = microgrid_data
        self._data_buffer = data_buffer_dir
        self._resampling_period = resampling_period
        self._end_time_delta = end_time_delta
        self._initial_period = initial_period

        self._data_buffer.mkdir(parents=True, exist_ok=True)

        # Maps temp_path -> final_path for files that need to be committed
        self._temp_files_to_commit: dict[Path, Path] = {}

    @staticmethod
    def _generate_filename(
        microgrid_id: int, components: tuple[str, ...], metric: str
    ) -> str:
        """Generate a unique, sanitized filename.

        Args:
            microgrid_id: The ID of the microgrid.
            components: The component types to include in the filename.
            metric: The name of the metric to include in the filename.

        Returns:
            A sanitized string suitable for use as a filename.
        """
        sorted_components = sorted([c.lower().replace(" ", "_") for c in components])
        return (
            f"mid{microgrid_id}_{'_'.join(sorted_components)}_{metric.lower()}.parquet"
        )

    async def receive_microgrid_data(
        self,
        microgrid_id: int,
        components: tuple[str, ...],
        metric: str,
    ) -> pd.DataFrame | None:
        """Query new microgrid data and write the updated buffer to a temp file.

        Reads the last timestamp from the main buffer file, queries for new data,
        and writes the combined data to a new temporary buffer file.

        Args:
            microgrid_id: The ID of the microgrid to query.
            components: The component types to include in the query.
            metric: The metric to query.

        Returns:
            A pandas DataFrame with only the new data points, or None.
        """
        _logger.debug(
            "Processing data for microgrid ID %s, components %s",
            microgrid_id,
            components,
        )

        final_parquet_file = self._data_buffer / self._generate_filename(
            microgrid_id, components, metric
        )
        temp_parquet_file = final_parquet_file.with_suffix(
            final_parquet_file.suffix + ".tmp"
        )

        now = datetime.now(timezone.utc)
        end_time = now - self._end_time_delta
        on_disk_buffer = pd.DataFrame()
        start_time: datetime

        try:
            on_disk_buffer = pd.read_parquet(final_parquet_file)
            if not on_disk_buffer.empty and isinstance(
                on_disk_buffer.index, pd.DatetimeIndex
            ):
                last_timestamp = on_disk_buffer.index.max().to_pydatetime()
                start_time = last_timestamp + timedelta(microseconds=1)
                _logger.info("Read buffer. Last timestamp: %s.", last_timestamp)
            else:
                _logger.info(
                    "Buffer file is empty. Defaulting to: %s.", self._initial_period
                )
                start_time = now - self._initial_period
        except FileNotFoundError:
            _logger.info(
                "No buffer file at '%s'. Defaulting to %s.",
                final_parquet_file,
                self._initial_period,
            )
            start_time = now - self._initial_period

        if start_time >= end_time:
            _logger.info("Buffer is up to date. No new data to fetch.")
            return None

        _logger.info("Fetching new data from %s to %s...", start_time, end_time)

        new_df = await self._microgrid_data.metric_data(
            microgrid_id=microgrid_id,
            start=start_time,
            end=end_time,
            component_types=components,
            resampling_period=self._resampling_period,
            metric=metric,
        )

        if new_df is not None and not new_df.empty:
            _logger.info("Fetched %d new data points.", len(new_df))
            combined_df = pd.concat([on_disk_buffer, new_df])
            combined_df = combined_df[~combined_df.index.duplicated(keep="last")]
            combined_df.sort_index(inplace=True)
            updated_buffer = combined_df.tail(BUFFER_SIZE)

            # Write to temporary file instead of final destination
            updated_buffer.to_parquet(temp_parquet_file, engine="pyarrow")

            _logger.info(
                "Wrote updated buffer with %d points to temporary file '%s'.",
                len(updated_buffer),
                temp_parquet_file,
            )

            self._temp_files_to_commit[temp_parquet_file] = final_parquet_file
        else:
            _logger.info("No new data was returned from the API.")

        return new_df

    def commit(self) -> None:
        """Commit the temporary files to their final locations."""
        _logger.info("Reports sent successfully. Committing buffer updates.")
        for temp_path, final_path in self._temp_files_to_commit.items():
            try:
                os.replace(temp_path, final_path)
                _logger.debug(
                    "Committed buffer update: %s -> %s", temp_path, final_path
                )
            except OSError as e:
                _logger.error(
                    "Failed to commit buffer update from %s to %s: %s",
                    temp_path,
                    final_path,
                    e,
                )

        self._temp_files_to_commit.clear()

    def rollback(self) -> None:
        """Rollback the temporary files if the transaction fails."""
        for temp_path in self._temp_files_to_commit:
            try:
                temp_path.unlink()
                _logger.debug("Rolled back by deleting temp file: %s", temp_path)
            except OSError as unlink_e:
                _logger.error(
                    "Failed to clean up temporary buffer file %s during rollback: %s",
                    temp_path,
                    unlink_e,
                )

        self._temp_files_to_commit.clear()
Functions¤
__init__ ¤
__init__(
    microgrid_data: MicrogridData,
    data_buffer_dir: Path,
    resampling_period: timedelta,
    end_time_delta: timedelta = END_TIME_DELTA,
    initial_period: timedelta = timedelta(hours=1),
) -> None

Initialize the TransactionalDataExport class.

PARAMETER DESCRIPTION
microgrid_data

An instance of MicrogridData to query data from.

TYPE: MicrogridData

data_buffer_dir

The path to the directory for buffering data.

TYPE: Path

resampling_period

The period to resample the data.

TYPE: timedelta

end_time_delta

The time subtracted from now to be passed as end_time to the API.

TYPE: timedelta DEFAULT: END_TIME_DELTA

initial_period

The initial period to use for the first data fetch.

TYPE: timedelta DEFAULT: timedelta(hours=1)

Source code in frequenz/data/microgrid/_stateful_data_fetcher.py
def __init__(  # pylint: disable=too-many-arguments, too-many-positional-arguments
    self,
    microgrid_data: MicrogridData,
    data_buffer_dir: Path,
    resampling_period: timedelta,
    end_time_delta: timedelta = END_TIME_DELTA,
    initial_period: timedelta = timedelta(hours=1),
) -> None:
    """Initialize the TransactionalDataExport class.

    Args:
        microgrid_data: An instance of MicrogridData to query data from.
        data_buffer_dir: The path to the directory for buffering data.
        resampling_period: The period to resample the data.
        end_time_delta: The time subtracted from now to be passed as
                        end_time to the API.
        initial_period: The initial period to use for the first data fetch.
    """
    _logger.debug("Initializing StatefulDataFetcher instance.")
    self._microgrid_data = microgrid_data
    self._data_buffer = data_buffer_dir
    self._resampling_period = resampling_period
    self._end_time_delta = end_time_delta
    self._initial_period = initial_period

    self._data_buffer.mkdir(parents=True, exist_ok=True)

    # Maps temp_path -> final_path for files that need to be committed
    self._temp_files_to_commit: dict[Path, Path] = {}
commit ¤
commit() -> None

Commit the temporary files to their final locations.

Source code in frequenz/data/microgrid/_stateful_data_fetcher.py
def commit(self) -> None:
    """Commit the temporary files to their final locations."""
    _logger.info("Reports sent successfully. Committing buffer updates.")
    for temp_path, final_path in self._temp_files_to_commit.items():
        try:
            os.replace(temp_path, final_path)
            _logger.debug(
                "Committed buffer update: %s -> %s", temp_path, final_path
            )
        except OSError as e:
            _logger.error(
                "Failed to commit buffer update from %s to %s: %s",
                temp_path,
                final_path,
                e,
            )

    self._temp_files_to_commit.clear()
receive_microgrid_data async ¤
receive_microgrid_data(
    microgrid_id: int,
    components: tuple[str, ...],
    metric: str,
) -> DataFrame | None

Query new microgrid data and write the updated buffer to a temp file.

Reads the last timestamp from the main buffer file, queries for new data, and writes the combined data to a new temporary buffer file.

PARAMETER DESCRIPTION
microgrid_id

The ID of the microgrid to query.

TYPE: int

components

The component types to include in the query.

TYPE: tuple[str, ...]

metric

The metric to query.

TYPE: str

RETURNS DESCRIPTION
DataFrame | None

A pandas DataFrame with only the new data points, or None.

Source code in frequenz/data/microgrid/_stateful_data_fetcher.py
async def receive_microgrid_data(
    self,
    microgrid_id: int,
    components: tuple[str, ...],
    metric: str,
) -> pd.DataFrame | None:
    """Query new microgrid data and write the updated buffer to a temp file.

    Reads the last timestamp from the main buffer file, queries for new data,
    and writes the combined data to a new temporary buffer file.

    Args:
        microgrid_id: The ID of the microgrid to query.
        components: The component types to include in the query.
        metric: The metric to query.

    Returns:
        A pandas DataFrame with only the new data points, or None.
    """
    _logger.debug(
        "Processing data for microgrid ID %s, components %s",
        microgrid_id,
        components,
    )

    final_parquet_file = self._data_buffer / self._generate_filename(
        microgrid_id, components, metric
    )
    temp_parquet_file = final_parquet_file.with_suffix(
        final_parquet_file.suffix + ".tmp"
    )

    now = datetime.now(timezone.utc)
    end_time = now - self._end_time_delta
    on_disk_buffer = pd.DataFrame()
    start_time: datetime

    try:
        on_disk_buffer = pd.read_parquet(final_parquet_file)
        if not on_disk_buffer.empty and isinstance(
            on_disk_buffer.index, pd.DatetimeIndex
        ):
            last_timestamp = on_disk_buffer.index.max().to_pydatetime()
            start_time = last_timestamp + timedelta(microseconds=1)
            _logger.info("Read buffer. Last timestamp: %s.", last_timestamp)
        else:
            _logger.info(
                "Buffer file is empty. Defaulting to: %s.", self._initial_period
            )
            start_time = now - self._initial_period
    except FileNotFoundError:
        _logger.info(
            "No buffer file at '%s'. Defaulting to %s.",
            final_parquet_file,
            self._initial_period,
        )
        start_time = now - self._initial_period

    if start_time >= end_time:
        _logger.info("Buffer is up to date. No new data to fetch.")
        return None

    _logger.info("Fetching new data from %s to %s...", start_time, end_time)

    new_df = await self._microgrid_data.metric_data(
        microgrid_id=microgrid_id,
        start=start_time,
        end=end_time,
        component_types=components,
        resampling_period=self._resampling_period,
        metric=metric,
    )

    if new_df is not None and not new_df.empty:
        _logger.info("Fetched %d new data points.", len(new_df))
        combined_df = pd.concat([on_disk_buffer, new_df])
        combined_df = combined_df[~combined_df.index.duplicated(keep="last")]
        combined_df.sort_index(inplace=True)
        updated_buffer = combined_df.tail(BUFFER_SIZE)

        # Write to temporary file instead of final destination
        updated_buffer.to_parquet(temp_parquet_file, engine="pyarrow")

        _logger.info(
            "Wrote updated buffer with %d points to temporary file '%s'.",
            len(updated_buffer),
            temp_parquet_file,
        )

        self._temp_files_to_commit[temp_parquet_file] = final_parquet_file
    else:
        _logger.info("No new data was returned from the API.")

    return new_df
rollback ¤
rollback() -> None

Rollback the temporary files if the transaction fails.

Source code in frequenz/data/microgrid/_stateful_data_fetcher.py
def rollback(self) -> None:
    """Rollback the temporary files if the transaction fails."""
    for temp_path in self._temp_files_to_commit:
        try:
            temp_path.unlink()
            _logger.debug("Rolled back by deleting temp file: %s", temp_path)
        except OSError as unlink_e:
            _logger.error(
                "Failed to clean up temporary buffer file %s during rollback: %s",
                temp_path,
                unlink_e,
            )

    self._temp_files_to_commit.clear()