Skip to content

streaming

frequenz.client.base.streaming ¤

Implementation of the grpc streaming helper.

Attributes¤

frequenz.client.base.streaming.InputT module-attribute ¤

InputT = TypeVar('InputT')

The input type of the stream.

frequenz.client.base.streaming.OutputT module-attribute ¤

OutputT = TypeVar('OutputT')

The output type of the stream.

Classes¤

frequenz.client.base.streaming.GrpcStreamBroadcaster ¤

Bases: Generic[InputT, OutputT]

Helper class to handle grpc streaming methods.

Source code in frequenz/client/base/streaming.py
class GrpcStreamBroadcaster(Generic[InputT, OutputT]):
    """Helper class to handle grpc streaming methods."""

    def __init__(  # pylint: disable=too-many-arguments,too-many-positional-arguments
        self,
        stream_name: str,
        stream_method: Callable[[], AsyncIterable[InputT]],
        transform: Callable[[InputT], OutputT],
        retry_strategy: retry.Strategy | None = None,
        retry_on_exhausted_stream: bool = False,
    ):
        """Initialize the streaming helper.

        Args:
            stream_name: A name to identify the stream in the logs.
            stream_method: A function that returns the grpc stream. This function is
                called everytime the connection is lost and we want to retry.
            transform: A function to transform the input type to the output type.
            retry_strategy: The retry strategy to use, when the connection is lost. Defaults
                to retries every 3 seconds, with a jitter of 1 second, indefinitely.
            retry_on_exhausted_stream: Whether to retry when the stream is exhausted, i.e.
                when the server closes the stream. Defaults to False.
        """
        self._stream_name = stream_name
        self._stream_method = stream_method
        self._transform = transform
        self._retry_strategy = (
            retry.LinearBackoff() if retry_strategy is None else retry_strategy.copy()
        )
        self._retry_on_exhausted_stream = retry_on_exhausted_stream

        self._channel: channels.Broadcast[OutputT] = channels.Broadcast(
            name=f"GrpcStreamBroadcaster-{stream_name}"
        )
        self._task = asyncio.create_task(self._run())

    def new_receiver(self, maxsize: int = 50) -> channels.Receiver[OutputT]:
        """Create a new receiver for the stream.

        Args:
            maxsize: The maximum number of messages to buffer.

        Returns:
            A new receiver.
        """
        return self._channel.new_receiver(limit=maxsize)

    @property
    def is_running(self) -> bool:
        """Return whether the streaming helper is running.

        Returns:
            Whether the streaming helper is running.
        """
        return not self._task.done()

    async def stop(self) -> None:
        """Stop the streaming helper."""
        if self._task.done():
            return
        self._task.cancel()
        try:
            await self._task
        except asyncio.CancelledError:
            pass
        await self._channel.close()

    async def _run(self) -> None:
        """Run the streaming helper."""
        sender = self._channel.new_sender()

        while True:
            error: Exception | None = None
            _logger.info("%s: starting to stream", self._stream_name)
            try:
                call = self._stream_method()
                async for msg in call:
                    await sender.send(self._transform(msg))
            except grpc.aio.AioRpcError as err:
                error = err
            if error is None and not self._retry_on_exhausted_stream:
                _logger.info(
                    "%s: connection closed, stream exhausted", self._stream_name
                )
                await self._channel.close()
                break
            error_str = f"Error: {error}" if error else "Stream exhausted"
            interval = self._retry_strategy.next_interval()
            if interval is None:
                _logger.error(
                    "%s: connection ended, retry limit exceeded (%s), giving up. %s.",
                    self._stream_name,
                    self._retry_strategy.get_progress(),
                    error_str,
                )
                await self._channel.close()
                break
            _logger.warning(
                "%s: connection ended, retrying %s in %0.3f seconds. %s.",
                self._stream_name,
                self._retry_strategy.get_progress(),
                interval,
                error_str,
            )
            await asyncio.sleep(interval)
Attributes¤
is_running property ¤
is_running: bool

Return whether the streaming helper is running.

RETURNS DESCRIPTION
bool

Whether the streaming helper is running.

Functions¤
__init__ ¤
__init__(
    stream_name: str,
    stream_method: Callable[[], AsyncIterable[InputT]],
    transform: Callable[[InputT], OutputT],
    retry_strategy: Strategy | None = None,
    retry_on_exhausted_stream: bool = False,
)

Initialize the streaming helper.

PARAMETER DESCRIPTION
stream_name

A name to identify the stream in the logs.

TYPE: str

stream_method

A function that returns the grpc stream. This function is called everytime the connection is lost and we want to retry.

TYPE: Callable[[], AsyncIterable[InputT]]

transform

A function to transform the input type to the output type.

TYPE: Callable[[InputT], OutputT]

retry_strategy

The retry strategy to use, when the connection is lost. Defaults to retries every 3 seconds, with a jitter of 1 second, indefinitely.

TYPE: Strategy | None DEFAULT: None

retry_on_exhausted_stream

Whether to retry when the stream is exhausted, i.e. when the server closes the stream. Defaults to False.

TYPE: bool DEFAULT: False

Source code in frequenz/client/base/streaming.py
def __init__(  # pylint: disable=too-many-arguments,too-many-positional-arguments
    self,
    stream_name: str,
    stream_method: Callable[[], AsyncIterable[InputT]],
    transform: Callable[[InputT], OutputT],
    retry_strategy: retry.Strategy | None = None,
    retry_on_exhausted_stream: bool = False,
):
    """Initialize the streaming helper.

    Args:
        stream_name: A name to identify the stream in the logs.
        stream_method: A function that returns the grpc stream. This function is
            called everytime the connection is lost and we want to retry.
        transform: A function to transform the input type to the output type.
        retry_strategy: The retry strategy to use, when the connection is lost. Defaults
            to retries every 3 seconds, with a jitter of 1 second, indefinitely.
        retry_on_exhausted_stream: Whether to retry when the stream is exhausted, i.e.
            when the server closes the stream. Defaults to False.
    """
    self._stream_name = stream_name
    self._stream_method = stream_method
    self._transform = transform
    self._retry_strategy = (
        retry.LinearBackoff() if retry_strategy is None else retry_strategy.copy()
    )
    self._retry_on_exhausted_stream = retry_on_exhausted_stream

    self._channel: channels.Broadcast[OutputT] = channels.Broadcast(
        name=f"GrpcStreamBroadcaster-{stream_name}"
    )
    self._task = asyncio.create_task(self._run())
new_receiver ¤
new_receiver(maxsize: int = 50) -> Receiver[OutputT]

Create a new receiver for the stream.

PARAMETER DESCRIPTION
maxsize

The maximum number of messages to buffer.

TYPE: int DEFAULT: 50

RETURNS DESCRIPTION
Receiver[OutputT]

A new receiver.

Source code in frequenz/client/base/streaming.py
def new_receiver(self, maxsize: int = 50) -> channels.Receiver[OutputT]:
    """Create a new receiver for the stream.

    Args:
        maxsize: The maximum number of messages to buffer.

    Returns:
        A new receiver.
    """
    return self._channel.new_receiver(limit=maxsize)
stop async ¤
stop() -> None

Stop the streaming helper.

Source code in frequenz/client/base/streaming.py
async def stop(self) -> None:
    """Stop the streaming helper."""
    if self._task.done():
        return
    self._task.cancel()
    try:
        await self._task
    except asyncio.CancelledError:
        pass
    await self._channel.close()