Appending To Merged Async Generators In Python
I'm trying to merge a bunch of asynchronous generators in Python 3.7 while still adding new async generators on iteration. I'm currently using aiostream to merge my generators: fro
Solution 1:
Here is an implementation that should work efficiently even with a large number of async iterators:
classmerge:
def__init__(self, *iterables):
self._iterables = list(iterables)
self._wakeup = asyncio.Event()
def_add_iters(self, next_futs, on_done):
for it in self._iterables:
it = it.__aiter__()
nfut = asyncio.ensure_future(it.__anext__())
nfut.add_done_callback(on_done)
next_futs[nfut] = it
del self._iterables[:]
return next_futs
asyncdef__aiter__(self):
done = {}
next_futs = {}
defon_done(nfut):
done[nfut] = next_futs.pop(nfut)
self._wakeup.set()
self._add_iters(next_futs, on_done)
try:
while next_futs:
await self._wakeup.wait()
self._wakeup.clear()
for nfut, it in done.items():
try:
ret = nfut.result()
except StopAsyncIteration:
continue
self._iterables.append(it)
yield ret
done.clear()
if self._iterables:
self._add_iters(next_futs, on_done)
finally:
# if the generator exits with an exception, or if the caller stops# iterating, make sure our callbacks are removedfor nfut in next_futs:
nfut.remove_done_callback(on_done)
defappend_iter(self, new_iter):
self._iterables.append(new_iter)
self._wakeup.set()
The only change required for your sample code is that the method is named append_iter
, not merge
.
Solution 2:
This can be done using stream.flatten with an asyncio queue to store the new generators.
import asyncio
from aiostream import stream, pipe
async def main():
queue = asyncio.Queue()
await queue.put(go())
await queue.put(go())
await queue.put(go())
xs = stream.call(queue.get)
ys = stream.cycle(xs)
zs = stream.flatten(ys, task_limit=5)
async with zs.stream() as streamer:
async for item in streamer:
if item == 50:
await queue.put(go())
print(item)
Notice that you may tune the number of tasks that can run at the same time using the task_limit
argument. Also note that zs
can be elegantly defined using the pipe syntax:
zs = stream.call(queue.get) | pipe.cycle() | pipe.flatten(task_limit=5)
Disclaimer: I am the project maintainer.
Post a Comment for "Appending To Merged Async Generators In Python"