Skip to content

weather

frequenz.client.weather ¤

Weather API Client for Python.

Classes¤

frequenz.client.weather.Client ¤

Bases: BaseApiClient[WeatherForecastServiceStub]

Weather forecast client.

Source code in frequenz/client/weather/_client.py
class Client(BaseApiClient[weather_pb2_grpc.WeatherForecastServiceStub]):
    """Weather forecast client."""

    def __init__(
        self,
        server_url: str,
        *,
        connect: bool = True,
        channel_defaults: ChannelOptions = ChannelOptions(),
    ) -> None:
        """Initialize the client.

        Args:
            server_url: The URL of the server to connect to.
            connect: Whether to connect to the server as soon as a client instance is
                created. If `False`, the client will not connect to the server until
                [connect()][frequenz.client.base.client.BaseApiClient.connect] is
                called.
            channel_defaults: Default options for the gRPC channel.
        """
        super().__init__(
            server_url,
            weather_pb2_grpc.WeatherForecastServiceStub,
            connect=connect,
            channel_defaults=channel_defaults,
        )
        self._streams: dict[
            tuple[Location | ForecastFeature, ...],
            GrpcStreamBroadcaster[
                weather_pb2.ReceiveLiveWeatherForecastResponse, Forecasts
            ],
        ] = {}

    @property
    def stub(self) -> weather_pb2_grpc.WeatherForecastServiceAsyncStub:
        """The gRPC stub for the API.

        Returns:
            The async gRPC stub for the Weather Forecast Service.

        Raises:
            ClientNotConnected: If the client is not connected to the server.
        """
        if self.channel is None or self._stub is None:
            raise ClientNotConnected(server_url=self.server_url, operation="stub")
        # This type: ignore is needed because we need to cast the sync stub to
        # the async stub, but we can't use cast because the async stub doesn't
        # actually exists to the eyes of the interpreter, it only exists for the
        # type-checker, so it can only be used for type hints.
        return self._stub  # type: ignore

    async def stream_live_forecast(
        self,
        locations: list[Location],
        features: list[ForecastFeature],
    ) -> Receiver[Forecasts]:
        """Stream live weather forecast data.

        Args:
            locations: locations to stream data for.
            features: features to stream data for.

        Returns:
            A channel receiver for weather forecast data.
        """
        stream_key = tuple(tuple(locations) + tuple(features))

        if stream_key not in self._streams:
            self._streams[stream_key] = GrpcStreamBroadcaster(
                f"weather-forecast-{stream_key}",
                lambda: self.stub.ReceiveLiveWeatherForecast(
                    weather_pb2.ReceiveLiveWeatherForecastRequest(
                        locations=(location.to_pb() for location in locations),
                        features=(feature.value for feature in features),
                    )
                ),
                Forecasts.from_pb,
            )
        return self._streams[stream_key].new_receiver()

    def hist_forecast_iterator(
        self,
        locations: list[Location],
        features: list[ForecastFeature],
        start: datetime,
        end: datetime,
    ) -> HistoricalForecastIterator:
        """Stream historical weather forecast data.

        Args:
            locations: locations to stream data for.
            features: features to stream data for.
            start: start of the time range to stream data for.
            end: end of the time range to stream data for.

        Returns:
            A channel receiver for weather forecast data.
        """
        return HistoricalForecastIterator(self.stub, locations, features, start, end)
Attributes¤
channel property ¤
channel: Channel

The underlying gRPC channel used to communicate with the server.

Warning

This channel is provided as a last resort for advanced users. It is not recommended to use this property directly unless you know what you are doing and you don't care about being tied to a specific gRPC library.

RAISES DESCRIPTION
ClientNotConnected

If the client is not connected to the server.

channel_defaults property ¤
channel_defaults: ChannelOptions

The default options for the gRPC channel.

is_connected property ¤
is_connected: bool

Whether the client is connected to the server.

server_url property ¤
server_url: str

The URL of the server.

stub property ¤
stub: WeatherForecastServiceAsyncStub

The gRPC stub for the API.

RETURNS DESCRIPTION
WeatherForecastServiceAsyncStub

The async gRPC stub for the Weather Forecast Service.

RAISES DESCRIPTION
ClientNotConnected

If the client is not connected to the server.

Functions¤
__aenter__ async ¤
__aenter__() -> Self

Enter a context manager.

Source code in frequenz/client/base/client.py
async def __aenter__(self) -> Self:
    """Enter a context manager."""
    self.connect()
    return self
__aexit__ async ¤
__aexit__(
    _exc_type: type[BaseException] | None,
    _exc_val: BaseException | None,
    _exc_tb: Any | None,
) -> bool | None

Exit a context manager.

Source code in frequenz/client/base/client.py
async def __aexit__(
    self,
    _exc_type: type[BaseException] | None,
    _exc_val: BaseException | None,
    _exc_tb: Any | None,
) -> bool | None:
    """Exit a context manager."""
    if self._channel is None:
        return None
    result = await self._channel.__aexit__(_exc_type, _exc_val, _exc_tb)
    self._channel = None
    self._stub = None
    return result
__init__ ¤
__init__(
    server_url: str,
    *,
    connect: bool = True,
    channel_defaults: ChannelOptions = ChannelOptions()
) -> None

Initialize the client.

PARAMETER DESCRIPTION
server_url

The URL of the server to connect to.

TYPE: str

connect

Whether to connect to the server as soon as a client instance is created. If False, the client will not connect to the server until connect() is called.

TYPE: bool DEFAULT: True

channel_defaults

Default options for the gRPC channel.

TYPE: ChannelOptions DEFAULT: ChannelOptions()

Source code in frequenz/client/weather/_client.py
def __init__(
    self,
    server_url: str,
    *,
    connect: bool = True,
    channel_defaults: ChannelOptions = ChannelOptions(),
) -> None:
    """Initialize the client.

    Args:
        server_url: The URL of the server to connect to.
        connect: Whether to connect to the server as soon as a client instance is
            created. If `False`, the client will not connect to the server until
            [connect()][frequenz.client.base.client.BaseApiClient.connect] is
            called.
        channel_defaults: Default options for the gRPC channel.
    """
    super().__init__(
        server_url,
        weather_pb2_grpc.WeatherForecastServiceStub,
        connect=connect,
        channel_defaults=channel_defaults,
    )
    self._streams: dict[
        tuple[Location | ForecastFeature, ...],
        GrpcStreamBroadcaster[
            weather_pb2.ReceiveLiveWeatherForecastResponse, Forecasts
        ],
    ] = {}
connect ¤
connect(
    server_url: str | None = None,
    *,
    auth_key: str | None | EllipsisType = ...,
    sign_secret: str | None | EllipsisType = ...
) -> None

Connect to the server, possibly using a new URL.

If the client is already connected and the URL is the same as the previous URL, this method does nothing. If you want to force a reconnection, you can call disconnect() first.

PARAMETER DESCRIPTION
server_url

The URL of the server to connect to. If not provided, the previously used URL is used.

TYPE: str | None DEFAULT: None

auth_key

The API key to use when connecting to the service. If an Ellipsis is provided, the previously used auth_key is used.

TYPE: str | None | EllipsisType DEFAULT: ...

sign_secret

The secret to use when creating message HMAC. If an Ellipsis is provided,

TYPE: str | None | EllipsisType DEFAULT: ...

Source code in frequenz/client/base/client.py
def connect(
    self,
    server_url: str | None = None,
    *,
    auth_key: str | None | EllipsisType = ...,
    sign_secret: str | None | EllipsisType = ...,
) -> None:
    """Connect to the server, possibly using a new URL.

    If the client is already connected and the URL is the same as the previous URL,
    this method does nothing. If you want to force a reconnection, you can call
    [disconnect()][frequenz.client.base.client.BaseApiClient.disconnect] first.

    Args:
        server_url: The URL of the server to connect to. If not provided, the
            previously used URL is used.
        auth_key: The API key to use when connecting to the service. If an Ellipsis
            is provided, the previously used auth_key is used.
        sign_secret: The secret to use when creating message HMAC. If an Ellipsis is
            provided,
    """
    reconnect = False
    if server_url is not None and server_url != self._server_url:  # URL changed
        self._server_url = server_url
        reconnect = True
    if auth_key is not ... and auth_key != self._auth_key:
        self._auth_key = auth_key
        reconnect = True
    if sign_secret is not ... and sign_secret != self._sign_secret:
        self._sign_secret = sign_secret
        reconnect = True
    if self.is_connected and not reconnect:  # Desired connection already exists
        return

    interceptors: list[ClientInterceptor] = []
    if self._auth_key is not None:
        interceptors += [
            AuthenticationInterceptorUnaryUnary(self._auth_key),  # type: ignore [list-item]
            AuthenticationInterceptorUnaryStream(self._auth_key),  # type: ignore [list-item]
        ]
    if self._sign_secret is not None:
        interceptors += [
            SigningInterceptorUnaryUnary(self._sign_secret),  # type: ignore [list-item]
            SigningInterceptorUnaryStream(self._sign_secret),  # type: ignore [list-item]
        ]

    self._channel = parse_grpc_uri(
        self._server_url,
        interceptors,
        defaults=self._channel_defaults,
    )
    self._stub = self._create_stub(self._channel)
disconnect async ¤
disconnect() -> None

Disconnect from the server.

If the client is not connected, this method does nothing.

Source code in frequenz/client/base/client.py
async def disconnect(self) -> None:
    """Disconnect from the server.

    If the client is not connected, this method does nothing.
    """
    await self.__aexit__(None, None, None)
hist_forecast_iterator ¤
hist_forecast_iterator(
    locations: list[Location],
    features: list[ForecastFeature],
    start: datetime,
    end: datetime,
) -> HistoricalForecastIterator

Stream historical weather forecast data.

PARAMETER DESCRIPTION
locations

locations to stream data for.

TYPE: list[Location]

features

features to stream data for.

TYPE: list[ForecastFeature]

start

start of the time range to stream data for.

TYPE: datetime

end

end of the time range to stream data for.

TYPE: datetime

RETURNS DESCRIPTION
HistoricalForecastIterator

A channel receiver for weather forecast data.

Source code in frequenz/client/weather/_client.py
def hist_forecast_iterator(
    self,
    locations: list[Location],
    features: list[ForecastFeature],
    start: datetime,
    end: datetime,
) -> HistoricalForecastIterator:
    """Stream historical weather forecast data.

    Args:
        locations: locations to stream data for.
        features: features to stream data for.
        start: start of the time range to stream data for.
        end: end of the time range to stream data for.

    Returns:
        A channel receiver for weather forecast data.
    """
    return HistoricalForecastIterator(self.stub, locations, features, start, end)
stream_live_forecast async ¤
stream_live_forecast(
    locations: list[Location],
    features: list[ForecastFeature],
) -> Receiver[Forecasts]

Stream live weather forecast data.

PARAMETER DESCRIPTION
locations

locations to stream data for.

TYPE: list[Location]

features

features to stream data for.

TYPE: list[ForecastFeature]

RETURNS DESCRIPTION
Receiver[Forecasts]

A channel receiver for weather forecast data.

Source code in frequenz/client/weather/_client.py
async def stream_live_forecast(
    self,
    locations: list[Location],
    features: list[ForecastFeature],
) -> Receiver[Forecasts]:
    """Stream live weather forecast data.

    Args:
        locations: locations to stream data for.
        features: features to stream data for.

    Returns:
        A channel receiver for weather forecast data.
    """
    stream_key = tuple(tuple(locations) + tuple(features))

    if stream_key not in self._streams:
        self._streams[stream_key] = GrpcStreamBroadcaster(
            f"weather-forecast-{stream_key}",
            lambda: self.stub.ReceiveLiveWeatherForecast(
                weather_pb2.ReceiveLiveWeatherForecastRequest(
                    locations=(location.to_pb() for location in locations),
                    features=(feature.value for feature in features),
                )
            ),
            Forecasts.from_pb,
        )
    return self._streams[stream_key].new_receiver()

frequenz.client.weather.ForecastFeature ¤

Bases: Enum

Weather forecast features available through the API.

Source code in frequenz/client/weather/_types.py
class ForecastFeature(enum.Enum):
    """Weather forecast features available through the API."""

    UNSPECIFIED = weather_pb2.ForecastFeature.FORECAST_FEATURE_UNSPECIFIED
    """Unspecified forecast feature."""

    TEMPERATURE_2_METRE = (
        weather_pb2.ForecastFeature.FORECAST_FEATURE_TEMPERATURE_2_METRE
    )
    """Temperature at 2m above the earth's surface."""

    U_WIND_COMPONENT_100_METRE = (
        weather_pb2.ForecastFeature.FORECAST_FEATURE_U_WIND_COMPONENT_100_METRE
    )
    """Eastward wind component at 100m altitude."""

    V_WIND_COMPONENT_100_METRE = (
        weather_pb2.ForecastFeature.FORECAST_FEATURE_V_WIND_COMPONENT_100_METRE
    )
    """Northward wind component at 100m altitude."""

    U_WIND_COMPONENT_10_METRE = (
        weather_pb2.ForecastFeature.FORECAST_FEATURE_U_WIND_COMPONENT_10_METRE
    )
    """Eastward wind component at 10m altitude."""

    V_WIND_COMPONENT_10_METRE = (
        weather_pb2.ForecastFeature.FORECAST_FEATURE_V_WIND_COMPONENT_10_METRE
    )
    """Northward wind component at 10m altitude."""

    SURFACE_SOLAR_RADIATION_DOWNWARDS = (
        weather_pb2.ForecastFeature.FORECAST_FEATURE_SURFACE_SOLAR_RADIATION_DOWNWARDS
    )
    """Surface solar radiation downwards."""

    SURFACE_NET_SOLAR_RADIATION = (
        weather_pb2.ForecastFeature.FORECAST_FEATURE_SURFACE_NET_SOLAR_RADIATION
    )
    """Surface net solar radiation."""

    @classmethod
    def from_pb(
        cls, forecast_feature: weather_pb2.ForecastFeature.ValueType
    ) -> ForecastFeature:
        """Convert a protobuf ForecastFeature value to ForecastFeature enum.

        Args:
            forecast_feature: protobuf forecast feature to convert.

        Returns:
            Enum value corresponding to the protobuf message.
        """
        if not any(t.value == forecast_feature for t in ForecastFeature):
            _logger.warning(
                "Unknown forecast feature %s. Returning UNSPECIFIED.", forecast_feature
            )
            return cls.UNSPECIFIED

        return ForecastFeature(forecast_feature)
Attributes¤
SURFACE_NET_SOLAR_RADIATION class-attribute instance-attribute ¤
SURFACE_NET_SOLAR_RADIATION = (
    FORECAST_FEATURE_SURFACE_NET_SOLAR_RADIATION
)

Surface net solar radiation.

SURFACE_SOLAR_RADIATION_DOWNWARDS class-attribute instance-attribute ¤
SURFACE_SOLAR_RADIATION_DOWNWARDS = (
    FORECAST_FEATURE_SURFACE_SOLAR_RADIATION_DOWNWARDS
)

Surface solar radiation downwards.

TEMPERATURE_2_METRE class-attribute instance-attribute ¤
TEMPERATURE_2_METRE = FORECAST_FEATURE_TEMPERATURE_2_METRE

Temperature at 2m above the earth's surface.

UNSPECIFIED class-attribute instance-attribute ¤
UNSPECIFIED = FORECAST_FEATURE_UNSPECIFIED

Unspecified forecast feature.

U_WIND_COMPONENT_100_METRE class-attribute instance-attribute ¤
U_WIND_COMPONENT_100_METRE = (
    FORECAST_FEATURE_U_WIND_COMPONENT_100_METRE
)

Eastward wind component at 100m altitude.

U_WIND_COMPONENT_10_METRE class-attribute instance-attribute ¤
U_WIND_COMPONENT_10_METRE = (
    FORECAST_FEATURE_U_WIND_COMPONENT_10_METRE
)

Eastward wind component at 10m altitude.

V_WIND_COMPONENT_100_METRE class-attribute instance-attribute ¤
V_WIND_COMPONENT_100_METRE = (
    FORECAST_FEATURE_V_WIND_COMPONENT_100_METRE
)

Northward wind component at 100m altitude.

V_WIND_COMPONENT_10_METRE class-attribute instance-attribute ¤
V_WIND_COMPONENT_10_METRE = (
    FORECAST_FEATURE_V_WIND_COMPONENT_10_METRE
)

Northward wind component at 10m altitude.

Functions¤
from_pb classmethod ¤
from_pb(forecast_feature: ValueType) -> ForecastFeature

Convert a protobuf ForecastFeature value to ForecastFeature enum.

PARAMETER DESCRIPTION
forecast_feature

protobuf forecast feature to convert.

TYPE: ValueType

RETURNS DESCRIPTION
ForecastFeature

Enum value corresponding to the protobuf message.

Source code in frequenz/client/weather/_types.py
@classmethod
def from_pb(
    cls, forecast_feature: weather_pb2.ForecastFeature.ValueType
) -> ForecastFeature:
    """Convert a protobuf ForecastFeature value to ForecastFeature enum.

    Args:
        forecast_feature: protobuf forecast feature to convert.

    Returns:
        Enum value corresponding to the protobuf message.
    """
    if not any(t.value == forecast_feature for t in ForecastFeature):
        _logger.warning(
            "Unknown forecast feature %s. Returning UNSPECIFIED.", forecast_feature
        )
        return cls.UNSPECIFIED

    return ForecastFeature(forecast_feature)

frequenz.client.weather.Forecasts dataclass ¤

Weather forecast data.

Source code in frequenz/client/weather/_types.py
@dataclass(frozen=True)
class Forecasts:
    """Weather forecast data."""

    _forecasts_pb: weather_pb2.ReceiveLiveWeatherForecastResponse

    @classmethod
    def from_pb(
        cls, forecasts: weather_pb2.ReceiveLiveWeatherForecastResponse
    ) -> Forecasts:
        """Convert a protobuf Forecast message to Forecast object.

        Args:
            forecasts: protobuf message with live forecast data.

        Returns:
            Forecast object corresponding to the protobuf message.
        """
        return cls(_forecasts_pb=forecasts)

    def to_resampled_ndarray(
        self,
        validity_times: list[dt.datetime],
        locations: list[Location] | None = None,
        features: list[ForecastFeature] | None = None,
        solar_offset_sec: int = 1800,
    ) -> np.ndarray[
        # the shape is known to be 3 dimensional, but the length of each dimension is
        # not fixed, so we use typing.Any, instead of the usual const generic
        # parameters.
        tuple[typing.Any, typing.Any, typing.Any],
        np.dtype[np.float64],
    ]:
        """Convert the forecast to a numpy array and resample to the specified validity_times.

        Args:
            validity_times: The validity times to resample to.
            locations: The locations to filter by.
            features: The features to filter by.
            solar_offset_sec: Time offset in seconds to shift solar forecasts

        Returns:
            Numpy array of shape (num_validity_times, num_locations, num_features)
        """
        original_validity_times = self._get_validity_times()
        array = self.to_ndarray_vlf(None, locations, features)
        if not features:
            features = self._get_features()

        resampled_array = self.upsample_vlf(
            array,
            original_validity_times,
            validity_times,
            features,
            solar_offset_sec,
        )

        return resampled_array

    def _get_features(self) -> list[ForecastFeature]:
        """Return the available features in the Forecast.

        Returns:
            List of forecast features.
        """
        if not self._forecasts_pb.location_forecasts:
            return []
        # Features need to only be extracted from one validity time
        # from one location as they are equal across all
        first_location = self._forecasts_pb.location_forecasts[0]
        if not first_location.forecasts:
            return []

        first_validity_time = first_location.forecasts[0]

        return [
            ForecastFeature.from_pb(feature.feature)
            for feature in first_validity_time.features
        ]

    def _get_validity_times(self) -> list[dt.datetime]:
        """Get validity times of the forecasts.

        Returns:
            List of validity times.
        """
        # All location_forecasts have the same validity times
        first_location = self._forecasts_pb.location_forecasts[0]
        validity_times = []

        for fc in first_location.forecasts:
            validity_times.append(
                dt.datetime.fromtimestamp(fc.valid_at_ts.seconds, tz=dt.UTC)
            )

        return validity_times

    # pylint: disable=too-many-locals,too-many-branches,too-many-statements
    def to_ndarray_vlf(
        self,
        validity_times: list[dt.datetime] | None = None,
        locations: list[Location] | None = None,
        features: list[ForecastFeature] | None = None,
    ) -> np.ndarray[
        # the shape is known to be 3 dimensional, but the length of each dimension is
        # not fixed, so we use typing.Any, instead of the usual const generic
        # parameters.
        tuple[typing.Any, typing.Any, typing.Any],
        np.dtype[np.float64],
    ]:
        """Convert a Forecast object to numpy array and use NaN to mark irrelevant data.

        If any of the filters are None, all values for that parameter will be returned.

        Args:
            validity_times: The validity times to filter by.
            locations: The locations to filter by.
            features: The features to filter by.

        Returns:
            Numpy array of shape (num_validity_times, num_locations, num_features)

        Raises:
            ValueError: If the forecasts data is missing or invalid.
            RuntimeError: If there is an error processing the forecast data.
        """
        # check for empty forecasts data
        if not self._forecasts_pb.location_forecasts:
            raise ValueError("Forecast data is missing or invalid.")

        try:
            num_times = len(self._forecasts_pb.location_forecasts[0].forecasts)
            num_locations = len(self._forecasts_pb.location_forecasts)
            num_features = len(
                self._forecasts_pb.location_forecasts[0].forecasts[0].features
            )

            # Look for the proto indexes of the filtered times, locations and features
            location_indexes = []
            validity_times_indexes = []
            feature_indexes = []

            # get the location indexes of the proto for the filtered locations
            if locations:
                for location in locations:
                    found = False
                    for l_index, location_forecast in enumerate(
                        self._forecasts_pb.location_forecasts
                    ):
                        if location == Location.from_pb(location_forecast.location):
                            location_indexes.append(l_index)
                            found = True
                            break
                    if not found:
                        # remember position of missing location
                        location_indexes.append(-1)
            else:
                location_indexes = list(range(num_locations))

            # get the val indexes of the proto for the filtered validity times
            if validity_times:
                for req_validitiy_time in validity_times:
                    found = False
                    for t_index, val_time in enumerate(
                        self._forecasts_pb.location_forecasts[0].forecasts
                    ):
                        if req_validitiy_time == val_time.valid_at_ts.ToDatetime():
                            validity_times_indexes.append(t_index)
                            found = True
                            break
                    if not found:
                        # remember position of missing validity time
                        validity_times_indexes.append(-1)
            else:
                validity_times_indexes = list(range(num_times))

            # get the feature indexes of the proto for the filtered features
            if features:
                for req_feature in features:
                    found = False
                    for f_index, feature in enumerate(
                        self._forecasts_pb.location_forecasts[0].forecasts[0].features
                    ):
                        if req_feature == ForecastFeature.from_pb(feature.feature):
                            feature_indexes.append(f_index)
                            found = True
                            break
                    if not found:
                        # remember position of missing feature
                        feature_indexes.append(-1)
            else:
                feature_indexes = list(range(num_features))

            array = np.full(
                (
                    len(validity_times_indexes),
                    len(location_indexes),
                    len(feature_indexes),
                ),
                np.nan,
            )

            array_l_index = 0

            for l_index in location_indexes:
                array_t_index = 0

                for t_index in validity_times_indexes:
                    array_f_index = 0

                    for f_index in feature_indexes:
                        # This fails if there was data missing for at least one of the
                        # keys and we don't update the array but leave it as NaN
                        if l_index >= 0 and t_index >= 0 and f_index >= 0:
                            array[array_t_index, array_l_index, array_f_index] = (
                                self._forecasts_pb.location_forecasts[l_index]
                                .forecasts[t_index]
                                .features[f_index]
                                .value
                            )
                        array_f_index += 1

                    array_t_index += 1

                array_l_index += 1

            # Check if the array shape matches the number of filtered times, locations
            # and features
            if validity_times is not None and array.shape[0] != len(validity_times):
                raise ValueError(
                    f"The count of validity times in the array({array.shape[0]}) does "
                    f"not match the requested validity times count ({len(validity_times)})"
                )
            if locations is not None and array.shape[1] != len(locations):
                raise ValueError(
                    f"The count of location in the array ({array.shape[1]}) does not "
                    f"match the requested location count ({len(locations)})"
                )
            if features is not None and array.shape[2] != len(features):
                raise ValueError(
                    f"The count of features in the array ({array.shape[2]}) does not "
                    f"match the requested feature count ({len(features)})"
                )

        # catch all exceptions
        except Exception as e:
            raise RuntimeError("Error processing forecast data") from e

        return array

    # pylint: disable= too-many-arguments, too-many-positional-arguments
    def upsample_vlf(
        self,
        array: np.ndarray[
            tuple[typing.Any, typing.Any, typing.Any], np.dtype[np.float64]
        ],
        validity_times: list[dt.datetime],
        target_times: list[dt.datetime],
        features: list[ForecastFeature],
        solar_offset_sec: int = 1800,
    ) -> np.ndarray[tuple[typing.Any, typing.Any, typing.Any], np.dtype[np.float64]]:
        """Upsample the forecast array to requested timestamps.

        Args:
            array: 3D array from to_ndarray_vlf (time, location, feature)
            validity_times: List of original timestamps
            target_times: List of desired timestamps to interpolate to
            features: List of forecast features
            solar_offset_sec: Time offset in seconds to shift solar forecasts

        Returns:
            Resampled 3D array with same structure interpolated to target timestamps

        Raises:
            ValueError: If input dimensions don't match or timestamps aren't monotonic
        """
        # Check input dimensions
        if array.shape[0] != len(validity_times):
            raise ValueError(
                f"Time dimension of input array ({array.shape[0]}) does not match "
                f"number of validity times ({len(validity_times)})"
            )
        if array.shape[2] != len(features):
            raise ValueError(
                f"Feature dimension of input array ({array.shape[2]}) does not match "
                f"number of features ({len(features)})"
            )

        # Validate target timestamps are strictly increasing
        if not all(t1 < t2 for t1, t2 in zip(target_times[:-1], target_times[1:])):
            raise ValueError("target_times must be strictly increasing")

        vts = np.array([t.timestamp() for t in validity_times])
        tts = np.array([t.timestamp() for t in target_times])

        resampled = np.zeros((len(target_times), array.shape[1], array.shape[2]))

        # Get indices of solar and non-solar features
        solar_idxs = [i for i, f in enumerate(features) if f in SOLAR_PARAMETERS]
        other_idxs = [i for i, f in enumerate(features) if f not in SOLAR_PARAMETERS]

        # Handle non-solar features with direct interpolation
        if other_idxs:
            resampled[..., other_idxs] = np.apply_along_axis(
                lambda x: np.interp(tts, vts, x, np.nan, np.nan),
                axis=0,
                arr=array[..., other_idxs],
            )

        # Handle solar features with shifted interpolation
        if solar_idxs:
            # Shift validity times by solar_offset_sec for solar parameters
            shifted_vts = vts + solar_offset_sec
            resampled[..., solar_idxs] = np.apply_along_axis(
                lambda x: np.interp(
                    tts,
                    shifted_vts,
                    x,
                    np.nan,
                    np.nan,
                ),
                axis=0,
                arr=array[..., solar_idxs],
            )

        return resampled

    def flatten(self) -> list[ForecastData]:
        """Flatten forecast data into a list of ForecastData tuples.

        Returns:
            List of ForecastData tuples containing the flattened forecast data.

        Raises:
            ValueError: If the forecasts data is missing or invalid.
        """
        # check for empty forecasts data
        if not self._forecasts_pb.location_forecasts:
            raise ValueError("Forecast data is missing or invalid.")

        return flatten(list(self._forecasts_pb.location_forecasts))
Functions¤
flatten ¤
flatten() -> list[ForecastData]

Flatten forecast data into a list of ForecastData tuples.

RETURNS DESCRIPTION
list[ForecastData]

List of ForecastData tuples containing the flattened forecast data.

RAISES DESCRIPTION
ValueError

If the forecasts data is missing or invalid.

Source code in frequenz/client/weather/_types.py
def flatten(self) -> list[ForecastData]:
    """Flatten forecast data into a list of ForecastData tuples.

    Returns:
        List of ForecastData tuples containing the flattened forecast data.

    Raises:
        ValueError: If the forecasts data is missing or invalid.
    """
    # check for empty forecasts data
    if not self._forecasts_pb.location_forecasts:
        raise ValueError("Forecast data is missing or invalid.")

    return flatten(list(self._forecasts_pb.location_forecasts))
from_pb classmethod ¤
from_pb(
    forecasts: ReceiveLiveWeatherForecastResponse,
) -> Forecasts

Convert a protobuf Forecast message to Forecast object.

PARAMETER DESCRIPTION
forecasts

protobuf message with live forecast data.

TYPE: ReceiveLiveWeatherForecastResponse

RETURNS DESCRIPTION
Forecasts

Forecast object corresponding to the protobuf message.

Source code in frequenz/client/weather/_types.py
@classmethod
def from_pb(
    cls, forecasts: weather_pb2.ReceiveLiveWeatherForecastResponse
) -> Forecasts:
    """Convert a protobuf Forecast message to Forecast object.

    Args:
        forecasts: protobuf message with live forecast data.

    Returns:
        Forecast object corresponding to the protobuf message.
    """
    return cls(_forecasts_pb=forecasts)
to_ndarray_vlf ¤
to_ndarray_vlf(
    validity_times: list[datetime] | None = None,
    locations: list[Location] | None = None,
    features: list[ForecastFeature] | None = None,
) -> ndarray[tuple[Any, Any, Any], dtype[float64]]

Convert a Forecast object to numpy array and use NaN to mark irrelevant data.

If any of the filters are None, all values for that parameter will be returned.

PARAMETER DESCRIPTION
validity_times

The validity times to filter by.

TYPE: list[datetime] | None DEFAULT: None

locations

The locations to filter by.

TYPE: list[Location] | None DEFAULT: None

features

The features to filter by.

TYPE: list[ForecastFeature] | None DEFAULT: None

RETURNS DESCRIPTION
ndarray[tuple[Any, Any, Any], dtype[float64]]

Numpy array of shape (num_validity_times, num_locations, num_features)

RAISES DESCRIPTION
ValueError

If the forecasts data is missing or invalid.

RuntimeError

If there is an error processing the forecast data.

Source code in frequenz/client/weather/_types.py
def to_ndarray_vlf(
    self,
    validity_times: list[dt.datetime] | None = None,
    locations: list[Location] | None = None,
    features: list[ForecastFeature] | None = None,
) -> np.ndarray[
    # the shape is known to be 3 dimensional, but the length of each dimension is
    # not fixed, so we use typing.Any, instead of the usual const generic
    # parameters.
    tuple[typing.Any, typing.Any, typing.Any],
    np.dtype[np.float64],
]:
    """Convert a Forecast object to numpy array and use NaN to mark irrelevant data.

    If any of the filters are None, all values for that parameter will be returned.

    Args:
        validity_times: The validity times to filter by.
        locations: The locations to filter by.
        features: The features to filter by.

    Returns:
        Numpy array of shape (num_validity_times, num_locations, num_features)

    Raises:
        ValueError: If the forecasts data is missing or invalid.
        RuntimeError: If there is an error processing the forecast data.
    """
    # check for empty forecasts data
    if not self._forecasts_pb.location_forecasts:
        raise ValueError("Forecast data is missing or invalid.")

    try:
        num_times = len(self._forecasts_pb.location_forecasts[0].forecasts)
        num_locations = len(self._forecasts_pb.location_forecasts)
        num_features = len(
            self._forecasts_pb.location_forecasts[0].forecasts[0].features
        )

        # Look for the proto indexes of the filtered times, locations and features
        location_indexes = []
        validity_times_indexes = []
        feature_indexes = []

        # get the location indexes of the proto for the filtered locations
        if locations:
            for location in locations:
                found = False
                for l_index, location_forecast in enumerate(
                    self._forecasts_pb.location_forecasts
                ):
                    if location == Location.from_pb(location_forecast.location):
                        location_indexes.append(l_index)
                        found = True
                        break
                if not found:
                    # remember position of missing location
                    location_indexes.append(-1)
        else:
            location_indexes = list(range(num_locations))

        # get the val indexes of the proto for the filtered validity times
        if validity_times:
            for req_validitiy_time in validity_times:
                found = False
                for t_index, val_time in enumerate(
                    self._forecasts_pb.location_forecasts[0].forecasts
                ):
                    if req_validitiy_time == val_time.valid_at_ts.ToDatetime():
                        validity_times_indexes.append(t_index)
                        found = True
                        break
                if not found:
                    # remember position of missing validity time
                    validity_times_indexes.append(-1)
        else:
            validity_times_indexes = list(range(num_times))

        # get the feature indexes of the proto for the filtered features
        if features:
            for req_feature in features:
                found = False
                for f_index, feature in enumerate(
                    self._forecasts_pb.location_forecasts[0].forecasts[0].features
                ):
                    if req_feature == ForecastFeature.from_pb(feature.feature):
                        feature_indexes.append(f_index)
                        found = True
                        break
                if not found:
                    # remember position of missing feature
                    feature_indexes.append(-1)
        else:
            feature_indexes = list(range(num_features))

        array = np.full(
            (
                len(validity_times_indexes),
                len(location_indexes),
                len(feature_indexes),
            ),
            np.nan,
        )

        array_l_index = 0

        for l_index in location_indexes:
            array_t_index = 0

            for t_index in validity_times_indexes:
                array_f_index = 0

                for f_index in feature_indexes:
                    # This fails if there was data missing for at least one of the
                    # keys and we don't update the array but leave it as NaN
                    if l_index >= 0 and t_index >= 0 and f_index >= 0:
                        array[array_t_index, array_l_index, array_f_index] = (
                            self._forecasts_pb.location_forecasts[l_index]
                            .forecasts[t_index]
                            .features[f_index]
                            .value
                        )
                    array_f_index += 1

                array_t_index += 1

            array_l_index += 1

        # Check if the array shape matches the number of filtered times, locations
        # and features
        if validity_times is not None and array.shape[0] != len(validity_times):
            raise ValueError(
                f"The count of validity times in the array({array.shape[0]}) does "
                f"not match the requested validity times count ({len(validity_times)})"
            )
        if locations is not None and array.shape[1] != len(locations):
            raise ValueError(
                f"The count of location in the array ({array.shape[1]}) does not "
                f"match the requested location count ({len(locations)})"
            )
        if features is not None and array.shape[2] != len(features):
            raise ValueError(
                f"The count of features in the array ({array.shape[2]}) does not "
                f"match the requested feature count ({len(features)})"
            )

    # catch all exceptions
    except Exception as e:
        raise RuntimeError("Error processing forecast data") from e

    return array
to_resampled_ndarray ¤
to_resampled_ndarray(
    validity_times: list[datetime],
    locations: list[Location] | None = None,
    features: list[ForecastFeature] | None = None,
    solar_offset_sec: int = 1800,
) -> ndarray[tuple[Any, Any, Any], dtype[float64]]

Convert the forecast to a numpy array and resample to the specified validity_times.

PARAMETER DESCRIPTION
validity_times

The validity times to resample to.

TYPE: list[datetime]

locations

The locations to filter by.

TYPE: list[Location] | None DEFAULT: None

features

The features to filter by.

TYPE: list[ForecastFeature] | None DEFAULT: None

solar_offset_sec

Time offset in seconds to shift solar forecasts

TYPE: int DEFAULT: 1800

RETURNS DESCRIPTION
ndarray[tuple[Any, Any, Any], dtype[float64]]

Numpy array of shape (num_validity_times, num_locations, num_features)

Source code in frequenz/client/weather/_types.py
def to_resampled_ndarray(
    self,
    validity_times: list[dt.datetime],
    locations: list[Location] | None = None,
    features: list[ForecastFeature] | None = None,
    solar_offset_sec: int = 1800,
) -> np.ndarray[
    # the shape is known to be 3 dimensional, but the length of each dimension is
    # not fixed, so we use typing.Any, instead of the usual const generic
    # parameters.
    tuple[typing.Any, typing.Any, typing.Any],
    np.dtype[np.float64],
]:
    """Convert the forecast to a numpy array and resample to the specified validity_times.

    Args:
        validity_times: The validity times to resample to.
        locations: The locations to filter by.
        features: The features to filter by.
        solar_offset_sec: Time offset in seconds to shift solar forecasts

    Returns:
        Numpy array of shape (num_validity_times, num_locations, num_features)
    """
    original_validity_times = self._get_validity_times()
    array = self.to_ndarray_vlf(None, locations, features)
    if not features:
        features = self._get_features()

    resampled_array = self.upsample_vlf(
        array,
        original_validity_times,
        validity_times,
        features,
        solar_offset_sec,
    )

    return resampled_array
upsample_vlf ¤
upsample_vlf(
    array: ndarray[tuple[Any, Any, Any], dtype[float64]],
    validity_times: list[datetime],
    target_times: list[datetime],
    features: list[ForecastFeature],
    solar_offset_sec: int = 1800,
) -> ndarray[tuple[Any, Any, Any], dtype[float64]]

Upsample the forecast array to requested timestamps.

PARAMETER DESCRIPTION
array

3D array from to_ndarray_vlf (time, location, feature)

TYPE: ndarray[tuple[Any, Any, Any], dtype[float64]]

validity_times

List of original timestamps

TYPE: list[datetime]

target_times

List of desired timestamps to interpolate to

TYPE: list[datetime]

features

List of forecast features

TYPE: list[ForecastFeature]

solar_offset_sec

Time offset in seconds to shift solar forecasts

TYPE: int DEFAULT: 1800

RETURNS DESCRIPTION
ndarray[tuple[Any, Any, Any], dtype[float64]]

Resampled 3D array with same structure interpolated to target timestamps

RAISES DESCRIPTION
ValueError

If input dimensions don't match or timestamps aren't monotonic

Source code in frequenz/client/weather/_types.py
def upsample_vlf(
    self,
    array: np.ndarray[
        tuple[typing.Any, typing.Any, typing.Any], np.dtype[np.float64]
    ],
    validity_times: list[dt.datetime],
    target_times: list[dt.datetime],
    features: list[ForecastFeature],
    solar_offset_sec: int = 1800,
) -> np.ndarray[tuple[typing.Any, typing.Any, typing.Any], np.dtype[np.float64]]:
    """Upsample the forecast array to requested timestamps.

    Args:
        array: 3D array from to_ndarray_vlf (time, location, feature)
        validity_times: List of original timestamps
        target_times: List of desired timestamps to interpolate to
        features: List of forecast features
        solar_offset_sec: Time offset in seconds to shift solar forecasts

    Returns:
        Resampled 3D array with same structure interpolated to target timestamps

    Raises:
        ValueError: If input dimensions don't match or timestamps aren't monotonic
    """
    # Check input dimensions
    if array.shape[0] != len(validity_times):
        raise ValueError(
            f"Time dimension of input array ({array.shape[0]}) does not match "
            f"number of validity times ({len(validity_times)})"
        )
    if array.shape[2] != len(features):
        raise ValueError(
            f"Feature dimension of input array ({array.shape[2]}) does not match "
            f"number of features ({len(features)})"
        )

    # Validate target timestamps are strictly increasing
    if not all(t1 < t2 for t1, t2 in zip(target_times[:-1], target_times[1:])):
        raise ValueError("target_times must be strictly increasing")

    vts = np.array([t.timestamp() for t in validity_times])
    tts = np.array([t.timestamp() for t in target_times])

    resampled = np.zeros((len(target_times), array.shape[1], array.shape[2]))

    # Get indices of solar and non-solar features
    solar_idxs = [i for i, f in enumerate(features) if f in SOLAR_PARAMETERS]
    other_idxs = [i for i, f in enumerate(features) if f not in SOLAR_PARAMETERS]

    # Handle non-solar features with direct interpolation
    if other_idxs:
        resampled[..., other_idxs] = np.apply_along_axis(
            lambda x: np.interp(tts, vts, x, np.nan, np.nan),
            axis=0,
            arr=array[..., other_idxs],
        )

    # Handle solar features with shifted interpolation
    if solar_idxs:
        # Shift validity times by solar_offset_sec for solar parameters
        shifted_vts = vts + solar_offset_sec
        resampled[..., solar_idxs] = np.apply_along_axis(
            lambda x: np.interp(
                tts,
                shifted_vts,
                x,
                np.nan,
                np.nan,
            ),
            axis=0,
            arr=array[..., solar_idxs],
        )

    return resampled

frequenz.client.weather.Location dataclass ¤

Location data.

ATTRIBUTE DESCRIPTION
latitude

latitude of the location.

TYPE: float

longitude

longitude of the location.

TYPE: float

country_code

ISO 3166-1 alpha-2 country code of the location.

TYPE: str

Source code in frequenz/client/weather/_types.py
@dataclass(frozen=True)
class Location:
    """Location data.

    Attributes:
        latitude: latitude of the location.
        longitude: longitude of the location.
        country_code: ISO 3166-1 alpha-2 country code of the location.
    """

    latitude: float
    longitude: float
    country_code: str

    @classmethod
    def from_pb(cls, location: location_pb2.Location) -> Location:
        """Convert a protobuf Location message to Location object.

        Args:
            location: protobuf location to convert.

        Returns:
            Location object corresponding to the protobuf message.
        """
        return cls(
            latitude=location.latitude,
            longitude=location.longitude,
            country_code=location.country_code,
        )

    def to_pb(self) -> location_pb2.Location:
        """Convert a Location object to protobuf Location message.

        Returns:
            Protobuf message corresponding to the Location object.
        """
        return location_pb2.Location(
            latitude=self.latitude,
            longitude=self.longitude,
            country_code=self.country_code,
        )
Functions¤
from_pb classmethod ¤
from_pb(location: Location) -> Location

Convert a protobuf Location message to Location object.

PARAMETER DESCRIPTION
location

protobuf location to convert.

TYPE: Location

RETURNS DESCRIPTION
Location

Location object corresponding to the protobuf message.

Source code in frequenz/client/weather/_types.py
@classmethod
def from_pb(cls, location: location_pb2.Location) -> Location:
    """Convert a protobuf Location message to Location object.

    Args:
        location: protobuf location to convert.

    Returns:
        Location object corresponding to the protobuf message.
    """
    return cls(
        latitude=location.latitude,
        longitude=location.longitude,
        country_code=location.country_code,
    )
to_pb ¤
to_pb() -> Location

Convert a Location object to protobuf Location message.

RETURNS DESCRIPTION
Location

Protobuf message corresponding to the Location object.

Source code in frequenz/client/weather/_types.py
def to_pb(self) -> location_pb2.Location:
    """Convert a Location object to protobuf Location message.

    Returns:
        Protobuf message corresponding to the Location object.
    """
    return location_pb2.Location(
        latitude=self.latitude,
        longitude=self.longitude,
        country_code=self.country_code,
    )