asyncpg-listen
This library simplifies usage of listen/notify with asyncpg:
- Handles loss of a connection
- Simplifies notifications processing from multiple channels
- Setups a timeout for receiving a notification
- Allows to receive all notifications/only last notification depending on ListenPolicy.
import asyncio
import asyncpg
import asyncpg_listen
async def handle_notifications(notification: asyncpg_listen.NotificationOrTimeout) -> None:
print(f"{notification} has been received")
listener = asyncpg_listen.NotificationListener(asyncpg_listen.connect_func())
listener_task = asyncio.create_task(
listener.run(
{"channel": handle_notifications},
policy=asyncpg_listen.ListenPolicy.LAST,
notification_timeout=1
)
)
async with asyncpg.connect() as connection:
for i in range(42):
await connection.execute(f"NOTIFY simple, '{i}'")