Skip to content

Index

frequenz.data.microgrid ¤

Initialize the microgrid data module.

Classes¤

frequenz.data.microgrid.MicrogridConfig dataclass ¤

Configuration of a microgrid.

Source code in frequenz/data/microgrid/config.py
@dataclass
class MicrogridConfig:
    """Configuration of a microgrid."""

    _metadata: Metadata
    """Metadata of the microgrid."""

    _assets_cfg: AssetsConfig
    """Configuration of the assets in the microgrid."""

    _component_types_cfg: dict[str, ComponentTypeConfig]
    """Mapping of component category types to ac power component config."""

    def __init__(self, config_dict: dict[str, Any]) -> None:
        """Initialize the microgrid configuration.

        Args:
            config_dict: Dictionary with component type as key and config as value.
        """
        self._metadata = Metadata(**(config_dict.get("meta") or {}))

        self._assets_cfg = AssetsConfig(
            pv=config_dict.get("pv") or {},
            wind=config_dict.get("wind") or {},
            battery=config_dict.get("battery") or {},
        )

        self._component_types_cfg = {
            ctype: ComponentTypeConfig(component_type=cast(ComponentType, ctype), **cfg)
            for ctype, cfg in config_dict.get("ctype", {}).items()
            if ComponentTypeConfig.is_valid_type(ctype)
        }

    @property
    def meta(self) -> Metadata:
        """Return the metadata of the microgrid."""
        return self._metadata

    @property
    def assets(self) -> AssetsConfig:
        """Return the assets configuration of the microgrid."""
        return self._assets_cfg

    def component_types(self) -> list[str]:
        """Get a list of all component types in the configuration."""
        return list(self._component_types_cfg.keys())

    def component_type_ids(
        self,
        component_type: str,
        component_category: str | None = None,
        metric: str = "",
    ) -> list[int]:
        """Get a list of all component IDs for a component type.

        Args:
            component_type: Component type to be aggregated.
            component_category: Specific category of component IDs to retrieve
                (e.g., "meter", "inverter", or "component"). If not provided,
                the default logic is used.
            metric: Metric name of the formula if CIDs should be extracted from the formula.

        Returns:
            List of component IDs for this component type.

        Raises:
            ValueError: If the component type is unknown.
            KeyError: If `component_category` is invalid.
        """
        cfg = self._component_types_cfg.get(component_type)
        if not cfg:
            raise ValueError(f"{component_type} not found in config.")

        if component_category:
            valid_categories = get_args(ComponentCategory)
            if component_category not in valid_categories:
                raise KeyError(
                    f"Invalid component category: {component_category}. "
                    f"Valid categories are {valid_categories}"
                )
            category_ids = cast(list[int], getattr(cfg, component_category, []))
            return category_ids

        return cfg.cids(metric)

    def formula(self, component_type: str, metric: str) -> str:
        """Get the formula for a component type.

        Args:
            component_type: Component type to be aggregated.
            metric: Metric to be aggregated.

        Returns:
            Formula to be used for this aggregated component as string.

        Raises:
            ValueError: If the component type is unknown or formula is missing.
        """
        cfg = self._component_types_cfg.get(component_type)
        if not cfg:
            raise ValueError(f"{component_type} not found in config.")
        if cfg.formula is None:
            raise ValueError(f"No formula set for {component_type}")
        formula = cfg.formula.get(metric)
        if not formula:
            raise ValueError(f"{component_type} is missing formula for {metric}")

        return formula

    @staticmethod
    def load_configs(*paths: str) -> dict[str, "MicrogridConfig"]:
        """Load multiple microgrid configurations from a file.

        Configs for a single microgrid are expected to be in a single file.
        Later files with the same microgrid ID will overwrite the previous configs.

        Args:
            *paths: Path(es) to the config file(s).

        Returns:
            Dictionary of single microgrid formula configs with microgrid IDs as keys.
        """
        microgrid_configs = {}
        for config_path in paths:
            with open(config_path, "rb") as f:
                cfg_dict = tomllib.load(f)
                for microgrid_id, mcfg in cfg_dict.items():
                    microgrid_configs[microgrid_id] = MicrogridConfig(mcfg)
        return microgrid_configs
Attributes¤
assets property ¤
assets: AssetsConfig

Return the assets configuration of the microgrid.

meta property ¤
meta: Metadata

Return the metadata of the microgrid.

Functions¤
__init__ ¤
__init__(config_dict: dict[str, Any]) -> None

Initialize the microgrid configuration.

PARAMETER DESCRIPTION
config_dict

Dictionary with component type as key and config as value.

TYPE: dict[str, Any]

Source code in frequenz/data/microgrid/config.py
def __init__(self, config_dict: dict[str, Any]) -> None:
    """Initialize the microgrid configuration.

    Args:
        config_dict: Dictionary with component type as key and config as value.
    """
    self._metadata = Metadata(**(config_dict.get("meta") or {}))

    self._assets_cfg = AssetsConfig(
        pv=config_dict.get("pv") or {},
        wind=config_dict.get("wind") or {},
        battery=config_dict.get("battery") or {},
    )

    self._component_types_cfg = {
        ctype: ComponentTypeConfig(component_type=cast(ComponentType, ctype), **cfg)
        for ctype, cfg in config_dict.get("ctype", {}).items()
        if ComponentTypeConfig.is_valid_type(ctype)
    }
component_type_ids ¤
component_type_ids(
    component_type: str,
    component_category: str | None = None,
    metric: str = "",
) -> list[int]

Get a list of all component IDs for a component type.

PARAMETER DESCRIPTION
component_type

Component type to be aggregated.

TYPE: str

component_category

Specific category of component IDs to retrieve (e.g., "meter", "inverter", or "component"). If not provided, the default logic is used.

TYPE: str | None DEFAULT: None

metric

Metric name of the formula if CIDs should be extracted from the formula.

TYPE: str DEFAULT: ''

RETURNS DESCRIPTION
list[int]

List of component IDs for this component type.

RAISES DESCRIPTION
ValueError

If the component type is unknown.

KeyError

If component_category is invalid.

Source code in frequenz/data/microgrid/config.py
def component_type_ids(
    self,
    component_type: str,
    component_category: str | None = None,
    metric: str = "",
) -> list[int]:
    """Get a list of all component IDs for a component type.

    Args:
        component_type: Component type to be aggregated.
        component_category: Specific category of component IDs to retrieve
            (e.g., "meter", "inverter", or "component"). If not provided,
            the default logic is used.
        metric: Metric name of the formula if CIDs should be extracted from the formula.

    Returns:
        List of component IDs for this component type.

    Raises:
        ValueError: If the component type is unknown.
        KeyError: If `component_category` is invalid.
    """
    cfg = self._component_types_cfg.get(component_type)
    if not cfg:
        raise ValueError(f"{component_type} not found in config.")

    if component_category:
        valid_categories = get_args(ComponentCategory)
        if component_category not in valid_categories:
            raise KeyError(
                f"Invalid component category: {component_category}. "
                f"Valid categories are {valid_categories}"
            )
        category_ids = cast(list[int], getattr(cfg, component_category, []))
        return category_ids

    return cfg.cids(metric)
component_types ¤
component_types() -> list[str]

Get a list of all component types in the configuration.

Source code in frequenz/data/microgrid/config.py
def component_types(self) -> list[str]:
    """Get a list of all component types in the configuration."""
    return list(self._component_types_cfg.keys())
formula ¤
formula(component_type: str, metric: str) -> str

Get the formula for a component type.

PARAMETER DESCRIPTION
component_type

Component type to be aggregated.

TYPE: str

metric

Metric to be aggregated.

TYPE: str

RETURNS DESCRIPTION
str

Formula to be used for this aggregated component as string.

RAISES DESCRIPTION
ValueError

If the component type is unknown or formula is missing.

Source code in frequenz/data/microgrid/config.py
def formula(self, component_type: str, metric: str) -> str:
    """Get the formula for a component type.

    Args:
        component_type: Component type to be aggregated.
        metric: Metric to be aggregated.

    Returns:
        Formula to be used for this aggregated component as string.

    Raises:
        ValueError: If the component type is unknown or formula is missing.
    """
    cfg = self._component_types_cfg.get(component_type)
    if not cfg:
        raise ValueError(f"{component_type} not found in config.")
    if cfg.formula is None:
        raise ValueError(f"No formula set for {component_type}")
    formula = cfg.formula.get(metric)
    if not formula:
        raise ValueError(f"{component_type} is missing formula for {metric}")

    return formula
load_configs staticmethod ¤
load_configs(*paths: str) -> dict[str, MicrogridConfig]

Load multiple microgrid configurations from a file.

Configs for a single microgrid are expected to be in a single file. Later files with the same microgrid ID will overwrite the previous configs.

PARAMETER DESCRIPTION
*paths

Path(es) to the config file(s).

TYPE: str DEFAULT: ()

RETURNS DESCRIPTION
dict[str, MicrogridConfig]

Dictionary of single microgrid formula configs with microgrid IDs as keys.

Source code in frequenz/data/microgrid/config.py
@staticmethod
def load_configs(*paths: str) -> dict[str, "MicrogridConfig"]:
    """Load multiple microgrid configurations from a file.

    Configs for a single microgrid are expected to be in a single file.
    Later files with the same microgrid ID will overwrite the previous configs.

    Args:
        *paths: Path(es) to the config file(s).

    Returns:
        Dictionary of single microgrid formula configs with microgrid IDs as keys.
    """
    microgrid_configs = {}
    for config_path in paths:
        with open(config_path, "rb") as f:
            cfg_dict = tomllib.load(f)
            for microgrid_id, mcfg in cfg_dict.items():
                microgrid_configs[microgrid_id] = MicrogridConfig(mcfg)
    return microgrid_configs

frequenz.data.microgrid.MicrogridData ¤

Fetch power data for component types of a microgrid.

Source code in frequenz/data/microgrid/component_data.py
class MicrogridData:
    """Fetch power data for component types of a microgrid."""

    def __init__(self, server_url: str, key: str, microgrid_config_path: str) -> None:
        """Initialize microgrid data.

        Args:
            server_url: URL of the reporting service.
            key: Authentication key to the service.
            microgrid_config_path: Path to the config file with microgrid components.
        """
        self._client = ReportingApiClient(server_url=server_url, key=key)

        self._microgrid_configs = MicrogridConfig.load_configs(microgrid_config_path)

    @property
    def microgrid_ids(self) -> list[str]:
        """Get the microgrid IDs.

        Returns:
            List of microgrid IDs.
        """
        return list(self._microgrid_configs.keys())

    @property
    def microgrid_configs(self) -> dict[str, MicrogridConfig]:
        """Return the microgrid configurations."""
        return self._microgrid_configs

    # pylint: disable=too-many-locals
    async def metric_data(  # pylint: disable=too-many-arguments
        self,
        *,
        microgrid_id: int,
        start: datetime,
        end: datetime,
        component_types: tuple[str, ...] = ("grid", "pv", "battery"),
        resampling_period: timedelta = timedelta(seconds=10),
        metric: str = "AC_ACTIVE_POWER",
        keep_components: bool = False,
        splits: bool = False,
    ) -> pd.DataFrame | None:
        """Power data for component types of a microgrid.

        Args:
            microgrid_id: Microgrid ID.
            start: Start timestamp.
            end: End timestamp.
            component_types: List of component types to be aggregated.
            resampling_period: Data resampling period.
            metric: Metric to be fetched.
            keep_components: Include individual components in output.
            splits: Include columns for positive and negative power values for components.

        Returns:
            DataFrame with power data of aggregated components
            or None if no data is available
        """
        mcfg = self._microgrid_configs[f"{microgrid_id}"]

        formulas = {
            ctype: mcfg.formula(ctype, metric.upper()) for ctype in component_types
        }

        metric_enum = Metric[metric.upper()]
        data = [
            sample
            for ctype, formula in formulas.items()
            async for sample in self._client.receive_aggregated_data(
                microgrid_id=microgrid_id,
                metric=metric_enum,
                aggregation_formula=formula,
                start=start,
                end=end,
                resampling_period=resampling_period,
            )
        ]

        all_cids = []
        if keep_components:
            all_cids = [
                cid
                for ctype in component_types
                for cid in mcfg.component_type_ids(ctype, metric=metric)
            ]
            _logger.debug("CIDs: %s", all_cids)
            microgrid_components = [
                (microgrid_id, all_cids),
            ]
            data_comp = [
                sample
                async for sample in self._client.list_microgrid_components_data(
                    microgrid_components=microgrid_components,
                    metrics=metric_enum,
                    start_dt=start,
                    end_dt=end,
                    resampling_period=resampling_period,
                )
            ]
            data.extend(data_comp)

        if len(data) == 0:
            _logger.warning("No data found")
            return None

        df = pd.DataFrame(data)
        df["timestamp"] = pd.to_datetime(df["timestamp"], utc=True)
        assert df["timestamp"].dt.tz is not None, "Timestamps are not tz-aware"

        # Remove duplicates
        dup_mask = df.duplicated(keep="first")
        if not dup_mask.empty:
            _logger.info("Found %s rows that have duplicates", dup_mask.sum())
        df = df[~dup_mask]

        # Pivot table
        df = df.pivot_table(index="timestamp", columns="component_id", values="value")
        # Rename formula columns
        rename_cols: dict[str, str] = {}
        for ctype, formula in formulas.items():
            if formula in rename_cols:
                _logger.warning(
                    "Ignoring %s since formula %s exists already for %s",
                    ctype,
                    formula,
                    rename_cols[formula],
                )
                continue
            rename_cols[formula] = ctype

        df = df.rename(columns=rename_cols)
        if keep_components:
            # Set missing columns to NaN
            for cid in all_cids:
                if cid not in df.columns:
                    _logger.warning(
                        "Component ID %s not found in data, setting zero", cid
                    )
                    df.loc[:, cid] = np.nan

        # Make string columns
        df.columns = [str(e) for e in df.columns]  # type: ignore

        cols = df.columns
        if splits:
            pos_cols = [f"{col}_pos" for col in cols]
            neg_cols = [f"{col}_neg" for col in cols]
            df[pos_cols] = df[cols].clip(lower=0)
            df[neg_cols] = df[cols].clip(upper=0)

        # Sort columns
        ctypes = list(rename_cols.values())
        new_cols = [e for e in ctypes if e in df.columns] + sorted(
            [e for e in df.columns if e not in ctypes]
        )
        df = df[new_cols]

        return df

    async def ac_active_power(  # pylint: disable=too-many-arguments
        self,
        *,
        microgrid_id: int,
        start: datetime,
        end: datetime,
        component_types: tuple[str, ...] = ("grid", "pv", "battery"),
        resampling_period: timedelta = timedelta(seconds=10),
        keep_components: bool = False,
        splits: bool = False,
        unit: str = "kW",
    ) -> pd.DataFrame | None:
        """Power data for component types of a microgrid."""
        df = await self.metric_data(
            microgrid_id=microgrid_id,
            start=start,
            end=end,
            component_types=component_types,
            resampling_period=resampling_period,
            metric="AC_ACTIVE_POWER",
            keep_components=keep_components,
            splits=splits,
        )
        if df is None:
            return df

        if unit == "W":
            pass
        if unit == "kW":
            df = df / 1000
        elif unit == "MW":
            df = df / 1e6
        else:
            raise ValueError(f"Unknown unit: {unit}")
        return df

    async def soc(  # pylint: disable=too-many-arguments
        self,
        *,
        microgrid_id: int,
        start: datetime,
        end: datetime,
        resampling_period: timedelta = timedelta(seconds=10),
        keep_components: bool = False,
    ) -> pd.DataFrame | None:
        """Soc data for component types of a microgrid."""
        df = await self.metric_data(
            microgrid_id=microgrid_id,
            start=start,
            end=end,
            component_types=("battery",),
            resampling_period=resampling_period,
            metric="BATTERY_SOC_PCT",
            keep_components=keep_components,
        )
        return df
Attributes¤
microgrid_configs property ¤
microgrid_configs: dict[str, MicrogridConfig]

Return the microgrid configurations.

microgrid_ids property ¤
microgrid_ids: list[str]

Get the microgrid IDs.

RETURNS DESCRIPTION
list[str]

List of microgrid IDs.

Functions¤
__init__ ¤
__init__(
    server_url: str, key: str, microgrid_config_path: str
) -> None

Initialize microgrid data.

PARAMETER DESCRIPTION
server_url

URL of the reporting service.

TYPE: str

key

Authentication key to the service.

TYPE: str

microgrid_config_path

Path to the config file with microgrid components.

TYPE: str

Source code in frequenz/data/microgrid/component_data.py
def __init__(self, server_url: str, key: str, microgrid_config_path: str) -> None:
    """Initialize microgrid data.

    Args:
        server_url: URL of the reporting service.
        key: Authentication key to the service.
        microgrid_config_path: Path to the config file with microgrid components.
    """
    self._client = ReportingApiClient(server_url=server_url, key=key)

    self._microgrid_configs = MicrogridConfig.load_configs(microgrid_config_path)
ac_active_power async ¤
ac_active_power(
    *,
    microgrid_id: int,
    start: datetime,
    end: datetime,
    component_types: tuple[str, ...] = (
        "grid",
        "pv",
        "battery",
    ),
    resampling_period: timedelta = timedelta(seconds=10),
    keep_components: bool = False,
    splits: bool = False,
    unit: str = "kW"
) -> DataFrame | None

Power data for component types of a microgrid.

Source code in frequenz/data/microgrid/component_data.py
async def ac_active_power(  # pylint: disable=too-many-arguments
    self,
    *,
    microgrid_id: int,
    start: datetime,
    end: datetime,
    component_types: tuple[str, ...] = ("grid", "pv", "battery"),
    resampling_period: timedelta = timedelta(seconds=10),
    keep_components: bool = False,
    splits: bool = False,
    unit: str = "kW",
) -> pd.DataFrame | None:
    """Power data for component types of a microgrid."""
    df = await self.metric_data(
        microgrid_id=microgrid_id,
        start=start,
        end=end,
        component_types=component_types,
        resampling_period=resampling_period,
        metric="AC_ACTIVE_POWER",
        keep_components=keep_components,
        splits=splits,
    )
    if df is None:
        return df

    if unit == "W":
        pass
    if unit == "kW":
        df = df / 1000
    elif unit == "MW":
        df = df / 1e6
    else:
        raise ValueError(f"Unknown unit: {unit}")
    return df
metric_data async ¤
metric_data(
    *,
    microgrid_id: int,
    start: datetime,
    end: datetime,
    component_types: tuple[str, ...] = (
        "grid",
        "pv",
        "battery",
    ),
    resampling_period: timedelta = timedelta(seconds=10),
    metric: str = "AC_ACTIVE_POWER",
    keep_components: bool = False,
    splits: bool = False
) -> DataFrame | None

Power data for component types of a microgrid.

PARAMETER DESCRIPTION
microgrid_id

Microgrid ID.

TYPE: int

start

Start timestamp.

TYPE: datetime

end

End timestamp.

TYPE: datetime

component_types

List of component types to be aggregated.

TYPE: tuple[str, ...] DEFAULT: ('grid', 'pv', 'battery')

resampling_period

Data resampling period.

TYPE: timedelta DEFAULT: timedelta(seconds=10)

metric

Metric to be fetched.

TYPE: str DEFAULT: 'AC_ACTIVE_POWER'

keep_components

Include individual components in output.

TYPE: bool DEFAULT: False

splits

Include columns for positive and negative power values for components.

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
DataFrame | None

DataFrame with power data of aggregated components

DataFrame | None

or None if no data is available

Source code in frequenz/data/microgrid/component_data.py
async def metric_data(  # pylint: disable=too-many-arguments
    self,
    *,
    microgrid_id: int,
    start: datetime,
    end: datetime,
    component_types: tuple[str, ...] = ("grid", "pv", "battery"),
    resampling_period: timedelta = timedelta(seconds=10),
    metric: str = "AC_ACTIVE_POWER",
    keep_components: bool = False,
    splits: bool = False,
) -> pd.DataFrame | None:
    """Power data for component types of a microgrid.

    Args:
        microgrid_id: Microgrid ID.
        start: Start timestamp.
        end: End timestamp.
        component_types: List of component types to be aggregated.
        resampling_period: Data resampling period.
        metric: Metric to be fetched.
        keep_components: Include individual components in output.
        splits: Include columns for positive and negative power values for components.

    Returns:
        DataFrame with power data of aggregated components
        or None if no data is available
    """
    mcfg = self._microgrid_configs[f"{microgrid_id}"]

    formulas = {
        ctype: mcfg.formula(ctype, metric.upper()) for ctype in component_types
    }

    metric_enum = Metric[metric.upper()]
    data = [
        sample
        for ctype, formula in formulas.items()
        async for sample in self._client.receive_aggregated_data(
            microgrid_id=microgrid_id,
            metric=metric_enum,
            aggregation_formula=formula,
            start=start,
            end=end,
            resampling_period=resampling_period,
        )
    ]

    all_cids = []
    if keep_components:
        all_cids = [
            cid
            for ctype in component_types
            for cid in mcfg.component_type_ids(ctype, metric=metric)
        ]
        _logger.debug("CIDs: %s", all_cids)
        microgrid_components = [
            (microgrid_id, all_cids),
        ]
        data_comp = [
            sample
            async for sample in self._client.list_microgrid_components_data(
                microgrid_components=microgrid_components,
                metrics=metric_enum,
                start_dt=start,
                end_dt=end,
                resampling_period=resampling_period,
            )
        ]
        data.extend(data_comp)

    if len(data) == 0:
        _logger.warning("No data found")
        return None

    df = pd.DataFrame(data)
    df["timestamp"] = pd.to_datetime(df["timestamp"], utc=True)
    assert df["timestamp"].dt.tz is not None, "Timestamps are not tz-aware"

    # Remove duplicates
    dup_mask = df.duplicated(keep="first")
    if not dup_mask.empty:
        _logger.info("Found %s rows that have duplicates", dup_mask.sum())
    df = df[~dup_mask]

    # Pivot table
    df = df.pivot_table(index="timestamp", columns="component_id", values="value")
    # Rename formula columns
    rename_cols: dict[str, str] = {}
    for ctype, formula in formulas.items():
        if formula in rename_cols:
            _logger.warning(
                "Ignoring %s since formula %s exists already for %s",
                ctype,
                formula,
                rename_cols[formula],
            )
            continue
        rename_cols[formula] = ctype

    df = df.rename(columns=rename_cols)
    if keep_components:
        # Set missing columns to NaN
        for cid in all_cids:
            if cid not in df.columns:
                _logger.warning(
                    "Component ID %s not found in data, setting zero", cid
                )
                df.loc[:, cid] = np.nan

    # Make string columns
    df.columns = [str(e) for e in df.columns]  # type: ignore

    cols = df.columns
    if splits:
        pos_cols = [f"{col}_pos" for col in cols]
        neg_cols = [f"{col}_neg" for col in cols]
        df[pos_cols] = df[cols].clip(lower=0)
        df[neg_cols] = df[cols].clip(upper=0)

    # Sort columns
    ctypes = list(rename_cols.values())
    new_cols = [e for e in ctypes if e in df.columns] + sorted(
        [e for e in df.columns if e not in ctypes]
    )
    df = df[new_cols]

    return df
soc async ¤
soc(
    *,
    microgrid_id: int,
    start: datetime,
    end: datetime,
    resampling_period: timedelta = timedelta(seconds=10),
    keep_components: bool = False
) -> DataFrame | None

Soc data for component types of a microgrid.

Source code in frequenz/data/microgrid/component_data.py
async def soc(  # pylint: disable=too-many-arguments
    self,
    *,
    microgrid_id: int,
    start: datetime,
    end: datetime,
    resampling_period: timedelta = timedelta(seconds=10),
    keep_components: bool = False,
) -> pd.DataFrame | None:
    """Soc data for component types of a microgrid."""
    df = await self.metric_data(
        microgrid_id=microgrid_id,
        start=start,
        end=end,
        component_types=("battery",),
        resampling_period=resampling_period,
        metric="BATTERY_SOC_PCT",
        keep_components=keep_components,
    )
    return df