Expects AsyncIterator class to raise StopAsyncIteration
exception once no more messages are expected or the channel
is closed in case of Receiver class.
Select the next available message from a group of AsyncIterators.
If Select was created with more AsyncIterator than what are read in
the if-chain after each call to ready(),
messages coming in the additional async iterators are dropped, and
a warning message is logged.
For example, if there are two async iterators that you want to
simultaneously wait on, this can be done with:
select=Select(name1=receiver1,name2=receiver2)whileawaitselect.ready():ifmsg:=select.name1:ifval:=msg.inner:# do something with `val`passelse:# handle closure of receiver.passelifmsg:=select.name2:# do something with `msg.inner`pass
classSelect:"""Select the next available message from a group of AsyncIterators. If `Select` was created with more `AsyncIterator` than what are read in the if-chain after each call to [ready()][frequenz.channels.Select.ready], messages coming in the additional async iterators are dropped, and a warning message is logged. [Receiver][frequenz.channels.Receiver]s also function as `AsyncIterator`. Example: For example, if there are two async iterators that you want to simultaneously wait on, this can be done with: ```python select = Select(name1 = receiver1, name2 = receiver2) while await select.ready(): if msg := select.name1: if val := msg.inner: # do something with `val` pass else: # handle closure of receiver. pass elif msg := select.name2: # do something with `msg.inner` pass ``` """def__init__(self,**kwargs:AsyncIterator[Any])->None:"""Create a `Select` instance. Args: **kwargs: sequence of async iterators """self._receivers=kwargsself._pending:Set[asyncio.Task[Any]]=set()forname,recvinself._receivers.items():# can replace __anext__() to anext() (Only Python 3.10>=)msg=recv.__anext__()# pylint: disable=unnecessary-dunder-callself._pending.add(asyncio.create_task(msg,name=name))# type: ignoreself._ready_count=0self._prev_ready_count=0self._result:Dict[str,Optional[_Selected]]={name:Nonefornameinself._receivers}def__del__(self)->None:"""Cleanup any pending tasks."""fortaskinself._pending:task.cancel()asyncdefready(self)->bool:"""Wait until there is a message in any of the async iterators. Returns `True` if there is a message available, and `False` if all async iterators have closed. Returns: Whether there are further messages or not. """ifself._ready_count>0:ifself._ready_count==self._prev_ready_count:dropped_names:List[str]=[]forname,valueinself._result.items():ifvalueisnotNone:dropped_names.append(name)self._result[name]=Noneself._ready_count=0self._prev_ready_count=0logger.warning("Select.ready() dropped data from async iterator(s): %s, ""because no messages have been fetched since the last call to ready().",dropped_names,)else:self._prev_ready_count=self._ready_countreturnTrueiflen(self._pending)==0:returnFalse# once all the pending messages have been consumed, reset the# `_prev_ready_count` as well, and wait for new messages.self._prev_ready_count=0done,self._pending=awaitasyncio.wait(self._pending,return_when=asyncio.FIRST_COMPLETED)foritemindone:name=item.get_name()ifisinstance(item.exception(),StopAsyncIteration):result=Noneelse:result=item.result()self._ready_count+=1self._result[name]=_Selected(result)# if channel or AsyncIterator is closed# don't add a task for it again.ifresultisNone:continuemsg=self._receivers[# pylint: disable=unnecessary-dunder-callname].__anext__()self._pending.add(asyncio.create_task(msg,name=name))# type: ignorereturnTruedef__getattr__(self,name:str)->Optional[Any]:"""Return the latest unread message from a `AsyncIterator`, if available. Args: name: Name of the channel. Returns: Latest unread message for the specified `AsyncIterator`, or `None`. Raises: KeyError: when the name was not specified when creating the `Select` instance. """result=self._result[name]ifresultisNone:returnresultself._result[name]=Noneself._ready_count-=1returnresult
def__getattr__(self,name:str)->Optional[Any]:"""Return the latest unread message from a `AsyncIterator`, if available. Args: name: Name of the channel. Returns: Latest unread message for the specified `AsyncIterator`, or `None`. Raises: KeyError: when the name was not specified when creating the `Select` instance. """result=self._result[name]ifresultisNone:returnresultself._result[name]=Noneself._ready_count-=1returnresult
asyncdefready(self)->bool:"""Wait until there is a message in any of the async iterators. Returns `True` if there is a message available, and `False` if all async iterators have closed. Returns: Whether there are further messages or not. """ifself._ready_count>0:ifself._ready_count==self._prev_ready_count:dropped_names:List[str]=[]forname,valueinself._result.items():ifvalueisnotNone:dropped_names.append(name)self._result[name]=Noneself._ready_count=0self._prev_ready_count=0logger.warning("Select.ready() dropped data from async iterator(s): %s, ""because no messages have been fetched since the last call to ready().",dropped_names,)else:self._prev_ready_count=self._ready_countreturnTrueiflen(self._pending)==0:returnFalse# once all the pending messages have been consumed, reset the# `_prev_ready_count` as well, and wait for new messages.self._prev_ready_count=0done,self._pending=awaitasyncio.wait(self._pending,return_when=asyncio.FIRST_COMPLETED)foritemindone:name=item.get_name()ifisinstance(item.exception(),StopAsyncIteration):result=Noneelse:result=item.result()self._ready_count+=1self._result[name]=_Selected(result)# if channel or AsyncIterator is closed# don't add a task for it again.ifresultisNone:continuemsg=self._receivers[# pylint: disable=unnecessary-dunder-callname].__anext__()self._pending.add(asyncio.create_task(msg,name=name))# type: ignorereturnTrue