Skip to content

weather

frequenz.client.weather ¤

The Weather Forecast API client.

Classes¤

frequenz.client.weather.Client ¤

Bases: BaseApiClient[WeatherForecastServiceStub]

Weather forecast client.

Source code in frequenz/client/weather/_client.py
class Client(BaseApiClient[weather_pb2_grpc.WeatherForecastServiceStub]):
    """Weather forecast client."""

    def __init__(
        self,
        server_url: str,
        *,
        connect: bool = True,
        channel_defaults: ChannelOptions = ChannelOptions(),
    ) -> None:
        """Initialize the client.

        Args:
            server_url: The URL of the server to connect to.
            connect: Whether to connect to the server as soon as a client instance is
                created. If `False`, the client will not connect to the server until
                [connect()][frequenz.client.base.client.BaseApiClient.connect] is
                called.
            channel_defaults: Default options for the gRPC channel.
        """
        super().__init__(
            server_url,
            weather_pb2_grpc.WeatherForecastServiceStub,
            connect=connect,
            channel_defaults=channel_defaults,
        )
        self._streams: dict[
            tuple[Location | ForecastFeature, ...],
            GrpcStreamBroadcaster[
                weather_pb2.ReceiveLiveWeatherForecastResponse, Forecasts
            ],
        ] = {}

    @property
    def stub(self) -> weather_pb2_grpc.WeatherForecastServiceAsyncStub:
        """The gRPC stub for the API."""
        if self.channel is None or self._stub is None:
            raise ClientNotConnected(server_url=self.server_url, operation="stub")
        # This type: ignore is needed because we need to cast the sync stub to
        # the async stub, but we can't use cast because the async stub doesn't
        # actually exists to the eyes of the interpreter, it only exists for the
        # type-checker, so it can only be used for type hints.
        return self._stub  # type: ignore

    async def stream_live_forecast(
        self,
        locations: list[Location],
        features: list[ForecastFeature],
    ) -> Receiver[Forecasts]:
        """Stream live weather forecast data.

        Args:
            locations: locations to stream data for.
            features: features to stream data for.

        Returns:
            A channel receiver for weather forecast data.
        """
        stream_key = tuple(tuple(locations) + tuple(features))

        if stream_key not in self._streams:
            self._streams[stream_key] = GrpcStreamBroadcaster(
                f"weather-forecast-{stream_key}",
                lambda: self.stub.ReceiveLiveWeatherForecast(
                    weather_pb2.ReceiveLiveWeatherForecastRequest(
                        locations=(location.to_pb() for location in locations),
                        features=(feature.value for feature in features),
                    )
                ),
                Forecasts.from_pb,
            )
        return self._streams[stream_key].new_receiver()

    def hist_forecast_iterator(
        self,
        locations: list[Location],
        features: list[ForecastFeature],
        start: datetime,
        end: datetime,
    ) -> HistoricalForecastIterator:
        """Stream historical weather forecast data.

        Args:
            locations: locations to stream data for.
            features: features to stream data for.
            start: start of the time range to stream data for.
            end: end of the time range to stream data for.

        Returns:
            A channel receiver for weather forecast data.
        """
        return HistoricalForecastIterator(self.stub, locations, features, start, end)
Attributes¤
channel property ¤
channel: Channel

The underlying gRPC channel used to communicate with the server.

Warning

This channel is provided as a last resort for advanced users. It is not recommended to use this property directly unless you know what you are doing and you don't care about being tied to a specific gRPC library.

RAISES DESCRIPTION
ClientNotConnected

If the client is not connected to the server.

channel_defaults property ¤
channel_defaults: ChannelOptions

The default options for the gRPC channel.

is_connected property ¤
is_connected: bool

Whether the client is connected to the server.

server_url property ¤
server_url: str

The URL of the server.

stub property ¤

The gRPC stub for the API.

Functions¤
__aenter__ async ¤
__aenter__() -> Self

Enter a context manager.

Source code in frequenz/client/base/client.py
async def __aenter__(self) -> Self:
    """Enter a context manager."""
    self.connect()
    return self
__aexit__ async ¤
__aexit__(
    _exc_type: type[BaseException] | None,
    _exc_val: BaseException | None,
    _exc_tb: Any | None,
) -> bool | None

Exit a context manager.

Source code in frequenz/client/base/client.py
async def __aexit__(
    self,
    _exc_type: type[BaseException] | None,
    _exc_val: BaseException | None,
    _exc_tb: Any | None,
) -> bool | None:
    """Exit a context manager."""
    if self._channel is None:
        return None
    result = await self._channel.__aexit__(_exc_type, _exc_val, _exc_tb)
    self._channel = None
    self._stub = None
    return result
__init__ ¤
__init__(
    server_url: str,
    *,
    connect: bool = True,
    channel_defaults: ChannelOptions = ChannelOptions()
) -> None

Initialize the client.

PARAMETER DESCRIPTION
server_url

The URL of the server to connect to.

TYPE: str

connect

Whether to connect to the server as soon as a client instance is created. If False, the client will not connect to the server until connect() is called.

TYPE: bool DEFAULT: True

channel_defaults

Default options for the gRPC channel.

TYPE: ChannelOptions DEFAULT: ChannelOptions()

Source code in frequenz/client/weather/_client.py
def __init__(
    self,
    server_url: str,
    *,
    connect: bool = True,
    channel_defaults: ChannelOptions = ChannelOptions(),
) -> None:
    """Initialize the client.

    Args:
        server_url: The URL of the server to connect to.
        connect: Whether to connect to the server as soon as a client instance is
            created. If `False`, the client will not connect to the server until
            [connect()][frequenz.client.base.client.BaseApiClient.connect] is
            called.
        channel_defaults: Default options for the gRPC channel.
    """
    super().__init__(
        server_url,
        weather_pb2_grpc.WeatherForecastServiceStub,
        connect=connect,
        channel_defaults=channel_defaults,
    )
    self._streams: dict[
        tuple[Location | ForecastFeature, ...],
        GrpcStreamBroadcaster[
            weather_pb2.ReceiveLiveWeatherForecastResponse, Forecasts
        ],
    ] = {}
connect ¤
connect(server_url: str | None = None) -> None

Connect to the server, possibly using a new URL.

If the client is already connected and the URL is the same as the previous URL, this method does nothing. If you want to force a reconnection, you can call disconnect() first.

PARAMETER DESCRIPTION
server_url

The URL of the server to connect to. If not provided, the previously used URL is used.

TYPE: str | None DEFAULT: None

Source code in frequenz/client/base/client.py
def connect(self, server_url: str | None = None) -> None:
    """Connect to the server, possibly using a new URL.

    If the client is already connected and the URL is the same as the previous URL,
    this method does nothing. If you want to force a reconnection, you can call
    [disconnect()][frequenz.client.base.client.BaseApiClient.disconnect] first.

    Args:
        server_url: The URL of the server to connect to. If not provided, the
            previously used URL is used.
    """
    if server_url is not None and server_url != self._server_url:  # URL changed
        self._server_url = server_url
    elif self.is_connected:
        return
    self._channel = parse_grpc_uri(self._server_url, self._channel_defaults)
    self._stub = self._create_stub(self._channel)
disconnect async ¤
disconnect() -> None

Disconnect from the server.

If the client is not connected, this method does nothing.

Source code in frequenz/client/base/client.py
async def disconnect(self) -> None:
    """Disconnect from the server.

    If the client is not connected, this method does nothing.
    """
    await self.__aexit__(None, None, None)
hist_forecast_iterator ¤
hist_forecast_iterator(
    locations: list[Location],
    features: list[ForecastFeature],
    start: datetime,
    end: datetime,
) -> HistoricalForecastIterator

Stream historical weather forecast data.

PARAMETER DESCRIPTION
locations

locations to stream data for.

TYPE: list[Location]

features

features to stream data for.

TYPE: list[ForecastFeature]

start

start of the time range to stream data for.

TYPE: datetime

end

end of the time range to stream data for.

TYPE: datetime

RETURNS DESCRIPTION
HistoricalForecastIterator

A channel receiver for weather forecast data.

Source code in frequenz/client/weather/_client.py
def hist_forecast_iterator(
    self,
    locations: list[Location],
    features: list[ForecastFeature],
    start: datetime,
    end: datetime,
) -> HistoricalForecastIterator:
    """Stream historical weather forecast data.

    Args:
        locations: locations to stream data for.
        features: features to stream data for.
        start: start of the time range to stream data for.
        end: end of the time range to stream data for.

    Returns:
        A channel receiver for weather forecast data.
    """
    return HistoricalForecastIterator(self.stub, locations, features, start, end)
stream_live_forecast async ¤
stream_live_forecast(
    locations: list[Location],
    features: list[ForecastFeature],
) -> Receiver[Forecasts]

Stream live weather forecast data.

PARAMETER DESCRIPTION
locations

locations to stream data for.

TYPE: list[Location]

features

features to stream data for.

TYPE: list[ForecastFeature]

RETURNS DESCRIPTION
Receiver[Forecasts]

A channel receiver for weather forecast data.

Source code in frequenz/client/weather/_client.py
async def stream_live_forecast(
    self,
    locations: list[Location],
    features: list[ForecastFeature],
) -> Receiver[Forecasts]:
    """Stream live weather forecast data.

    Args:
        locations: locations to stream data for.
        features: features to stream data for.

    Returns:
        A channel receiver for weather forecast data.
    """
    stream_key = tuple(tuple(locations) + tuple(features))

    if stream_key not in self._streams:
        self._streams[stream_key] = GrpcStreamBroadcaster(
            f"weather-forecast-{stream_key}",
            lambda: self.stub.ReceiveLiveWeatherForecast(
                weather_pb2.ReceiveLiveWeatherForecastRequest(
                    locations=(location.to_pb() for location in locations),
                    features=(feature.value for feature in features),
                )
            ),
            Forecasts.from_pb,
        )
    return self._streams[stream_key].new_receiver()

frequenz.client.weather.ForecastFeature ¤

Bases: Enum

Weather forecast features available through the API.

Source code in frequenz/client/weather/_types.py
class ForecastFeature(enum.Enum):
    """Weather forecast features available through the API."""

    UNSPECIFIED = weather_pb2.ForecastFeature.FORECAST_FEATURE_UNSPECIFIED
    """Unspecified forecast feature."""

    TEMPERATURE_2_METRE = (
        weather_pb2.ForecastFeature.FORECAST_FEATURE_TEMPERATURE_2_METRE
    )
    """Temperature at 2m above the earth's surface."""

    U_WIND_COMPONENT_100_METRE = (
        weather_pb2.ForecastFeature.FORECAST_FEATURE_U_WIND_COMPONENT_100_METRE
    )
    """Eastward wind component at 100m altitude."""

    V_WIND_COMPONENT_100_METRE = (
        weather_pb2.ForecastFeature.FORECAST_FEATURE_V_WIND_COMPONENT_100_METRE
    )
    """Northward wind component at 100m altitude."""

    U_WIND_COMPONENT_10_METRE = (
        weather_pb2.ForecastFeature.FORECAST_FEATURE_U_WIND_COMPONENT_10_METRE
    )
    """Eastward wind component at 10m altitude."""

    V_WIND_COMPONENT_10_METRE = (
        weather_pb2.ForecastFeature.FORECAST_FEATURE_V_WIND_COMPONENT_10_METRE
    )
    """Northward wind component at 10m altitude."""

    SURFACE_SOLAR_RADIATION_DOWNWARDS = (
        weather_pb2.ForecastFeature.FORECAST_FEATURE_SURFACE_SOLAR_RADIATION_DOWNWARDS
    )
    """Surface solar radiation downwards."""

    SURFACE_NET_SOLAR_RADIATION = (
        weather_pb2.ForecastFeature.FORECAST_FEATURE_SURFACE_NET_SOLAR_RADIATION
    )
    """Surface net solar radiation."""

    @classmethod
    def from_pb(
        cls, forecast_feature: weather_pb2.ForecastFeature.ValueType
    ) -> ForecastFeature:
        """Convert a protobuf ForecastFeature value to ForecastFeature enum.

        Args:
            forecast_feature: protobuf forecast feature to convert.

        Returns:
            Enum value corresponding to the protobuf message.
        """
        if not any(t.value == forecast_feature for t in ForecastFeature):
            _logger.warning(
                "Unknown forecast feature %s. Returning UNSPECIFIED.", forecast_feature
            )
            return cls.UNSPECIFIED

        return ForecastFeature(forecast_feature)
Attributes¤
SURFACE_NET_SOLAR_RADIATION class-attribute instance-attribute ¤
SURFACE_NET_SOLAR_RADIATION = (
    FORECAST_FEATURE_SURFACE_NET_SOLAR_RADIATION
)

Surface net solar radiation.

SURFACE_SOLAR_RADIATION_DOWNWARDS class-attribute instance-attribute ¤
SURFACE_SOLAR_RADIATION_DOWNWARDS = (
    FORECAST_FEATURE_SURFACE_SOLAR_RADIATION_DOWNWARDS
)

Surface solar radiation downwards.

TEMPERATURE_2_METRE class-attribute instance-attribute ¤
TEMPERATURE_2_METRE = FORECAST_FEATURE_TEMPERATURE_2_METRE

Temperature at 2m above the earth's surface.

UNSPECIFIED class-attribute instance-attribute ¤
UNSPECIFIED = FORECAST_FEATURE_UNSPECIFIED

Unspecified forecast feature.

U_WIND_COMPONENT_100_METRE class-attribute instance-attribute ¤
U_WIND_COMPONENT_100_METRE = (
    FORECAST_FEATURE_U_WIND_COMPONENT_100_METRE
)

Eastward wind component at 100m altitude.

U_WIND_COMPONENT_10_METRE class-attribute instance-attribute ¤
U_WIND_COMPONENT_10_METRE = (
    FORECAST_FEATURE_U_WIND_COMPONENT_10_METRE
)

Eastward wind component at 10m altitude.

V_WIND_COMPONENT_100_METRE class-attribute instance-attribute ¤
V_WIND_COMPONENT_100_METRE = (
    FORECAST_FEATURE_V_WIND_COMPONENT_100_METRE
)

Northward wind component at 100m altitude.

V_WIND_COMPONENT_10_METRE class-attribute instance-attribute ¤
V_WIND_COMPONENT_10_METRE = (
    FORECAST_FEATURE_V_WIND_COMPONENT_10_METRE
)

Northward wind component at 10m altitude.

Functions¤
from_pb classmethod ¤
from_pb(forecast_feature: ValueType) -> ForecastFeature

Convert a protobuf ForecastFeature value to ForecastFeature enum.

PARAMETER DESCRIPTION
forecast_feature

protobuf forecast feature to convert.

TYPE: ValueType

RETURNS DESCRIPTION
ForecastFeature

Enum value corresponding to the protobuf message.

Source code in frequenz/client/weather/_types.py
@classmethod
def from_pb(
    cls, forecast_feature: weather_pb2.ForecastFeature.ValueType
) -> ForecastFeature:
    """Convert a protobuf ForecastFeature value to ForecastFeature enum.

    Args:
        forecast_feature: protobuf forecast feature to convert.

    Returns:
        Enum value corresponding to the protobuf message.
    """
    if not any(t.value == forecast_feature for t in ForecastFeature):
        _logger.warning(
            "Unknown forecast feature %s. Returning UNSPECIFIED.", forecast_feature
        )
        return cls.UNSPECIFIED

    return ForecastFeature(forecast_feature)

frequenz.client.weather.Forecasts dataclass ¤

Weather forecast data.

Source code in frequenz/client/weather/_types.py
@dataclass(frozen=True)
class Forecasts:
    """Weather forecast data."""

    _forecasts_pb: weather_pb2.ReceiveLiveWeatherForecastResponse

    @classmethod
    def from_pb(
        cls, forecasts: weather_pb2.ReceiveLiveWeatherForecastResponse
    ) -> Forecasts:
        """Convert a protobuf Forecast message to Forecast object.

        Args:
            forecasts: protobuf message with live forecast data.

        Returns:
            Forecast object corresponding to the protobuf message.
        """
        return cls(_forecasts_pb=forecasts)

    # pylint: disable=too-many-locals,too-many-branches,too-many-statements
    def to_ndarray_vlf(
        self,
        validity_times: list[dt.datetime] | None = None,
        locations: list[Location] | None = None,
        features: list[ForecastFeature] | None = None,
    ) -> np.ndarray[
        # the shape is known to be 3 dimensional, but the length of each dimension is
        # not fixed, so we use typing.Any, instead of the usual const generic
        # parameters.
        tuple[typing.Any, typing.Any, typing.Any],
        np.dtype[np.float64],
    ]:
        """Convert a Forecast object to numpy array and use NaN to mark irrelevant data.

        If any of the filters are None, all values for that parameter will be returned.

        Args:
            validity_times: The validity times to filter by.
            locations: The locations to filter by.
            features: The features to filter by.

        Returns:
            Numpy array of shape (num_validity_times, num_locations, num_features)

        Raises:
            ValueError: If the forecasts data is missing or invalid.
        """
        # check for empty forecasts data
        if not self._forecasts_pb.location_forecasts:
            raise ValueError("Forecast data is missing or invalid.")

        try:
            num_times = len(self._forecasts_pb.location_forecasts[0].forecasts)
            num_locations = len(self._forecasts_pb.location_forecasts)
            num_features = len(
                self._forecasts_pb.location_forecasts[0].forecasts[0].features
            )

            # Look for the proto indexes of the filtered times, locations and features
            location_indexes = []
            validity_times_indexes = []
            feature_indexes = []

            # get the location indexes of the proto for the filtered locations
            if locations:
                for location in locations:
                    found = False
                    for l_index, location_forecast in enumerate(
                        self._forecasts_pb.location_forecasts
                    ):
                        if location == Location.from_pb(location_forecast.location):
                            location_indexes.append(l_index)
                            found = True
                            break
                    if not found:
                        # remember position of missing location
                        location_indexes.append(-1)
            else:
                location_indexes = list(range(num_locations))

            # get the val indexes of the proto for the filtered validity times
            if validity_times:
                for req_validitiy_time in validity_times:
                    found = False
                    for t_index, val_time in enumerate(
                        self._forecasts_pb.location_forecasts[0].forecasts
                    ):
                        if req_validitiy_time == val_time.valid_at_ts.ToDatetime():
                            validity_times_indexes.append(t_index)
                            found = True
                            break
                    if not found:
                        # remember position of missing validity time
                        validity_times_indexes.append(-1)
            else:
                validity_times_indexes = list(range(num_times))

            # get the feature indexes of the proto for the filtered features
            if features:
                for req_feature in features:
                    found = False
                    for f_index, feature in enumerate(
                        self._forecasts_pb.location_forecasts[0].forecasts[0].features
                    ):
                        if req_feature == ForecastFeature.from_pb(feature.feature):
                            feature_indexes.append(f_index)
                            found = True
                            break
                    if not found:
                        # remember position of missing feature
                        feature_indexes.append(-1)
            else:
                feature_indexes = list(range(num_features))

            array = np.full(
                (
                    len(validity_times_indexes),
                    len(location_indexes),
                    len(feature_indexes),
                ),
                np.nan,
            )

            array_l_index = 0

            for l_index in location_indexes:
                array_t_index = 0

                for t_index in validity_times_indexes:
                    array_f_index = 0

                    for f_index in feature_indexes:
                        # This fails if there was data missing for at least one of the
                        # keys and we don't update the array but leave it as NaN
                        if l_index >= 0 and t_index >= 0 and f_index >= 0:
                            array[array_t_index, array_l_index, array_f_index] = (
                                self._forecasts_pb.location_forecasts[l_index]
                                .forecasts[t_index]
                                .features[f_index]
                                .value
                            )
                        array_f_index += 1

                    array_t_index += 1

                array_l_index += 1

            # Check if the array shape matches the number of filtered times, locations
            # and features
            if validity_times is not None and array.shape[0] != len(validity_times):
                raise ValueError(
                    f"The count of validity times in the array({array.shape[0]}) does "
                    f"not match the requested validity times count ({validity_times}."
                )
            if locations is not None and array.shape[1] != len(locations):
                raise ValueError(
                    f"The count of location in the array ({array.shape[1]}) does not "
                    f"match the requested location count ({locations})."
                )
            if features is not None and array.shape[2] != len(features):
                raise ValueError(
                    f"The count of features in the array ({array.shape[2]}) does not "
                    f"match the requested feature count ({features})."
                )

        # catch all exceptions
        except Exception as e:
            raise RuntimeError("Error processing forecast data") from e

        return array
Functions¤
from_pb classmethod ¤

Convert a protobuf Forecast message to Forecast object.

PARAMETER DESCRIPTION
forecasts

protobuf message with live forecast data.

TYPE: ReceiveLiveWeatherForecastResponse

RETURNS DESCRIPTION
Forecasts

Forecast object corresponding to the protobuf message.

Source code in frequenz/client/weather/_types.py
@classmethod
def from_pb(
    cls, forecasts: weather_pb2.ReceiveLiveWeatherForecastResponse
) -> Forecasts:
    """Convert a protobuf Forecast message to Forecast object.

    Args:
        forecasts: protobuf message with live forecast data.

    Returns:
        Forecast object corresponding to the protobuf message.
    """
    return cls(_forecasts_pb=forecasts)
to_ndarray_vlf ¤
to_ndarray_vlf(
    validity_times: list[datetime] | None = None,
    locations: list[Location] | None = None,
    features: list[ForecastFeature] | None = None,
) -> ndarray[tuple[Any, Any, Any], dtype[float64]]

Convert a Forecast object to numpy array and use NaN to mark irrelevant data.

If any of the filters are None, all values for that parameter will be returned.

PARAMETER DESCRIPTION
validity_times

The validity times to filter by.

TYPE: list[datetime] | None DEFAULT: None

locations

The locations to filter by.

TYPE: list[Location] | None DEFAULT: None

features

The features to filter by.

TYPE: list[ForecastFeature] | None DEFAULT: None

RETURNS DESCRIPTION
ndarray[tuple[Any, Any, Any], dtype[float64]]

Numpy array of shape (num_validity_times, num_locations, num_features)

RAISES DESCRIPTION
ValueError

If the forecasts data is missing or invalid.

Source code in frequenz/client/weather/_types.py
def to_ndarray_vlf(
    self,
    validity_times: list[dt.datetime] | None = None,
    locations: list[Location] | None = None,
    features: list[ForecastFeature] | None = None,
) -> np.ndarray[
    # the shape is known to be 3 dimensional, but the length of each dimension is
    # not fixed, so we use typing.Any, instead of the usual const generic
    # parameters.
    tuple[typing.Any, typing.Any, typing.Any],
    np.dtype[np.float64],
]:
    """Convert a Forecast object to numpy array and use NaN to mark irrelevant data.

    If any of the filters are None, all values for that parameter will be returned.

    Args:
        validity_times: The validity times to filter by.
        locations: The locations to filter by.
        features: The features to filter by.

    Returns:
        Numpy array of shape (num_validity_times, num_locations, num_features)

    Raises:
        ValueError: If the forecasts data is missing or invalid.
    """
    # check for empty forecasts data
    if not self._forecasts_pb.location_forecasts:
        raise ValueError("Forecast data is missing or invalid.")

    try:
        num_times = len(self._forecasts_pb.location_forecasts[0].forecasts)
        num_locations = len(self._forecasts_pb.location_forecasts)
        num_features = len(
            self._forecasts_pb.location_forecasts[0].forecasts[0].features
        )

        # Look for the proto indexes of the filtered times, locations and features
        location_indexes = []
        validity_times_indexes = []
        feature_indexes = []

        # get the location indexes of the proto for the filtered locations
        if locations:
            for location in locations:
                found = False
                for l_index, location_forecast in enumerate(
                    self._forecasts_pb.location_forecasts
                ):
                    if location == Location.from_pb(location_forecast.location):
                        location_indexes.append(l_index)
                        found = True
                        break
                if not found:
                    # remember position of missing location
                    location_indexes.append(-1)
        else:
            location_indexes = list(range(num_locations))

        # get the val indexes of the proto for the filtered validity times
        if validity_times:
            for req_validitiy_time in validity_times:
                found = False
                for t_index, val_time in enumerate(
                    self._forecasts_pb.location_forecasts[0].forecasts
                ):
                    if req_validitiy_time == val_time.valid_at_ts.ToDatetime():
                        validity_times_indexes.append(t_index)
                        found = True
                        break
                if not found:
                    # remember position of missing validity time
                    validity_times_indexes.append(-1)
        else:
            validity_times_indexes = list(range(num_times))

        # get the feature indexes of the proto for the filtered features
        if features:
            for req_feature in features:
                found = False
                for f_index, feature in enumerate(
                    self._forecasts_pb.location_forecasts[0].forecasts[0].features
                ):
                    if req_feature == ForecastFeature.from_pb(feature.feature):
                        feature_indexes.append(f_index)
                        found = True
                        break
                if not found:
                    # remember position of missing feature
                    feature_indexes.append(-1)
        else:
            feature_indexes = list(range(num_features))

        array = np.full(
            (
                len(validity_times_indexes),
                len(location_indexes),
                len(feature_indexes),
            ),
            np.nan,
        )

        array_l_index = 0

        for l_index in location_indexes:
            array_t_index = 0

            for t_index in validity_times_indexes:
                array_f_index = 0

                for f_index in feature_indexes:
                    # This fails if there was data missing for at least one of the
                    # keys and we don't update the array but leave it as NaN
                    if l_index >= 0 and t_index >= 0 and f_index >= 0:
                        array[array_t_index, array_l_index, array_f_index] = (
                            self._forecasts_pb.location_forecasts[l_index]
                            .forecasts[t_index]
                            .features[f_index]
                            .value
                        )
                    array_f_index += 1

                array_t_index += 1

            array_l_index += 1

        # Check if the array shape matches the number of filtered times, locations
        # and features
        if validity_times is not None and array.shape[0] != len(validity_times):
            raise ValueError(
                f"The count of validity times in the array({array.shape[0]}) does "
                f"not match the requested validity times count ({validity_times}."
            )
        if locations is not None and array.shape[1] != len(locations):
            raise ValueError(
                f"The count of location in the array ({array.shape[1]}) does not "
                f"match the requested location count ({locations})."
            )
        if features is not None and array.shape[2] != len(features):
            raise ValueError(
                f"The count of features in the array ({array.shape[2]}) does not "
                f"match the requested feature count ({features})."
            )

    # catch all exceptions
    except Exception as e:
        raise RuntimeError("Error processing forecast data") from e

    return array

frequenz.client.weather.Location dataclass ¤

Location data.

ATTRIBUTE DESCRIPTION
latitude

latitude of the location.

TYPE: float

longitude

longitude of the location.

TYPE: float

country_code

ISO 3166-1 alpha-2 country code of the location.

TYPE: str

Source code in frequenz/client/weather/_types.py
@dataclass(frozen=True)
class Location:
    """Location data.

    Attributes:
        latitude: latitude of the location.
        longitude: longitude of the location.
        country_code: ISO 3166-1 alpha-2 country code of the location.
    """

    latitude: float
    longitude: float
    country_code: str

    @classmethod
    def from_pb(cls, location: location_pb2.Location) -> Location:
        """Convert a protobuf Location message to Location object.

        Args:
            location: protobuf location to convert.

        Returns:
            Location object corresponding to the protobuf message.
        """
        return cls(
            latitude=location.latitude,
            longitude=location.longitude,
            country_code=location.country_code,
        )

    def to_pb(self) -> location_pb2.Location:
        """Convert a Location object to protobuf Location message.

        Returns:
            Protobuf message corresponding to the Location object.
        """
        return location_pb2.Location(
            latitude=self.latitude,
            longitude=self.longitude,
            country_code=self.country_code,
        )
Functions¤
from_pb classmethod ¤
from_pb(location: Location) -> Location

Convert a protobuf Location message to Location object.

PARAMETER DESCRIPTION
location

protobuf location to convert.

TYPE: Location

RETURNS DESCRIPTION
Location

Location object corresponding to the protobuf message.

Source code in frequenz/client/weather/_types.py
@classmethod
def from_pb(cls, location: location_pb2.Location) -> Location:
    """Convert a protobuf Location message to Location object.

    Args:
        location: protobuf location to convert.

    Returns:
        Location object corresponding to the protobuf message.
    """
    return cls(
        latitude=location.latitude,
        longitude=location.longitude,
        country_code=location.country_code,
    )
to_pb ¤
to_pb() -> Location

Convert a Location object to protobuf Location message.

RETURNS DESCRIPTION
Location

Protobuf message corresponding to the Location object.

Source code in frequenz/client/weather/_types.py
def to_pb(self) -> location_pb2.Location:
    """Convert a Location object to protobuf Location message.

    Returns:
        Protobuf message corresponding to the Location object.
    """
    return location_pb2.Location(
        latitude=self.latitude,
        longitude=self.longitude,
        country_code=self.country_code,
    )