Thread-safe asyncio-aware queue for Python

Overview

janus

Chat on Gitter

Mixed sync-async queue, supposed to be used for communicating between classic synchronous (threaded) code and asynchronous (in terms of asyncio) one.

Like Janus god the queue object from the library has two faces: synchronous and asynchronous interface.

Synchronous is fully compatible with standard queue, asynchronous one follows asyncio queue design.

Usage example (Python 3.7+)

import asyncio
import janus


def threaded(sync_q: janus.SyncQueue[int]) -> None:
    for i in range(100):
        sync_q.put(i)
    sync_q.join()


async def async_coro(async_q: janus.AsyncQueue[int]) -> None:
    for i in range(100):
        val = await async_q.get()
        assert val == i
        async_q.task_done()


async def main() -> None:
    queue: janus.Queue[int] = janus.Queue()
    loop = asyncio.get_running_loop()
    fut = loop.run_in_executor(None, threaded, queue.sync_q)
    await async_coro(queue.async_q)
    await fut
    queue.close()
    await queue.wait_closed()


asyncio.run(main())

Usage example (Python 3.5 and 3.6)

import asyncio
import janus

loop = asyncio.get_event_loop()


def threaded(sync_q):
    for i in range(100):
        sync_q.put(i)
    sync_q.join()


async def async_coro(async_q):
    for i in range(100):
        val = await async_q.get()
        assert val == i
        async_q.task_done()


async def main():
    queue = janus.Queue()
    fut = loop.run_in_executor(None, threaded, queue.sync_q)
    await async_coro(queue.async_q)
    await fut
    queue.close()
    await queue.wait_closed()

try:
    loop.run_until_complete(main())
finally:
    loop.close()

Communication channels

GitHub Discussions: https://github.com/aio-libs/janus/discussions

Feel free to post your questions and ideas here.

gitter chat https://gitter.im/aio-libs/Lobby

License

janus library is offered under Apache 2 license.

Thanks

The library development is sponsored by DataRobot (https://datarobot.com)

Comments
  • Error with Python 3.10:

    Error with Python 3.10: "ValueError: loop argument must agree with lock"

    I ran into this in my own project, see https://github.com/simonw/datasette/pull/1481 - then I tried using a fork of this project to run the unit tests against Python 3.10 and got the same error: https://github.com/simonw/janus/runs/3842463703?check_suite_focus=true

     ============================= test session starts ==============================
     platform linux -- Python 3.10.0, pytest-6.2.4, py-1.10.0, pluggy-0.13.1
     rootdir: /home/runner/work/janus/janus
     plugins: cov-2.12.1, asyncio-0.15.1
     collected 72 items
     
     tests/test_async.py FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF                      [ 43%]
     tests/test_mixed.py .FFFFFFFFFFFFFFFFF                                   [ 68%]
     tests/test_sync.py FFFFFFFFFFFFFFFFFFFFFFF                               [100%]
     
     =================================== FAILURES ===================================
     __________________________ TestQueueBasic.test_empty ___________________________
     
     self = <test_async.TestQueueBasic object at 0x7fb0f561e4d0>
     
         @pytest.mark.asyncio
         async def test_empty(self):
     >       _q = janus.Queue()
     
     tests/test_async.py:65: 
     _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
     janus/__init__.py:39: in __init__
         self._async_not_empty = asyncio.Condition(self._async_mutex)
     _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
     
     self = <[AttributeError("'Condition' object has no attribute 'locked'") raised in repr()] Condition object at 0x7fb0f569a620>
     lock = <asyncio.locks.Lock object at 0x7fb0f569ada0 [unlocked]>
     
         def __init__(self, lock=None, *, loop=mixins._marker):
             super().__init__(loop=loop)
             if lock is None:
                 lock = Lock()
             elif lock._loop is not self._get_loop():
     >           raise ValueError("loop argument must agree with lock")
     E           ValueError: loop argument must agree with lock
    
    opened by simonw 10
  • accept loop parameter for Queue

    accept loop parameter for Queue

    What do these changes do?

    Use case: If one wants to create the Queue in the main thread of the app, but use it with an async loop that is running in a thread (not the one in the main thread).

    Otw. one would have to create the queue in the thread to pick up the correct loop.

    Are there changes in behavior for the user?

    no change in behavior if used as before. Now user can pass a custom loop.

    Checklist

    • [x] I think the code is well written
    • [x] Add a new news fragment into the CHANGES folder

    This change is Reviewable

    opened by ali5h 9
  • expose the unfinished tasks variable

    expose the unfinished tasks variable

    I use queue.empty() and queue._parent._unfinished_tasks == 0 to check whether a queue is complete and this pull request exposes the _unfinished_tasks variable.

    My use case is 2 queues that can send work to each other until both are complete - is there a better way to achieve this? Note that I'm not trying to join() since a thread should wake up if there is more work to do.

    opened by richardbaronpenman 9
  • Fix Syntax error with Python 3.7

    Fix Syntax error with Python 3.7

    async is reserved keyword in Python 3.7

    Fix issue #95 Most likely https://github.com/Rapptz/discord.py/commit/096584733e8a8025b13f46fa920e18abe19352c1

    opened by EcmaXp 7
  • `RuntimeError: no running event loop` after upgrading to 0.5.0

    `RuntimeError: no running event loop` after upgrading to 0.5.0

    Unfortunately, changes in #246 broke the code that relies on creating an instance of janus.Queue outside of the scope of a running event loop, for no obvious reason.

    Also, the behavior now contradicts with the behavior of asyncio.Queue.

    >>> import asyncio
    >>> asyncio.Queue()
    <Queue at 0x7f76be085730 maxsize=0>
    
    >>> import janus
    >>> janus.Queue()
    Traceback (most recent call last):
      File "<stdin>", line 1, in <module>
      File "/home/user/projects/project/.venv/lib64/python3.8/site-packages/janus/__init__.py", line 29, in __init__
        self._loop = current_loop()
    RuntimeError: no running event loop
    

    Is there any good reason to not just use asyncio.get_event_loop? From performance perspective it shouldn't be significant.

    opened by martyanov 6
  • doesn't play nicely with concurrent.futures.ProcessPoolExecutor

    doesn't play nicely with concurrent.futures.ProcessPoolExecutor

    if loop.run_in_executor(executor,...) is ProcessPoolExecutor instead of None, exceptions will be raised such as inability to serialize thread lock objects and callItems.

    opened by FirefighterBlu3 5
  • Inaccurate dependence python 3.5.3 - because of typing.Deque

    Inaccurate dependence python 3.5.3 - because of typing.Deque

    The class typing.Deque was first introduced in Python 3.5.4. The parameter for package dependencies should be updated to: python_requires='>=3.5.4'

    opened by farax4de 4
  • Add typing inside the code and few other lint improvements

    Add typing inside the code and few other lint improvements

    What do these changes do?

    1. Removed stub file
    2. Source code fully annotated
    3. Added bandit and pyroma linters

    Are there changes in behavior for the user?

    Related issue number

    Checklist

    • [ ] I think the code is well written
    • [ ] Unit tests for the changes exist
    • [ ] Documentation reflects the changes
    • [ ] If you provide code modification, please add yourself to CONTRIBUTORS.txt
      • The format is <Name> <Surname>.
      • Please keep alphabetical order, the file is sorted by names.
    • [ ] Add a new news fragment into the CHANGES folder
      • name it <issue_id>.<type> (e.g. 588.bugfix)
      • if you don't have an issue_id change it to the pr id after creating the PR
      • ensure type is one of the following:
        • .feature: Signifying a new feature.
        • .bugfix: Signifying a bug fix.
        • .doc: Signifying a documentation improvement.
        • .removal: Signifying a deprecation or removal of public API.
        • .misc: A ticket has been closed, but it is not of interest to users.
      • Make sure to use full sentences with correct case and punctuation, for example: Fix issue with non-ascii contents in doctest text files.
    opened by jettify 4
  • add SyncQueue and AsyncQueue Protocols

    add SyncQueue and AsyncQueue Protocols

    What do these changes do?

    Add protocols to avoid the use of private classes _SyncQueueProxy and _AsyncQueueProxy when adding type hints.

    Are there changes in behavior for the user?

    I assume that not. But it would be good if somebody more experienced could check this.

    Related issue number

    See https://github.com/aio-libs/janus/issues/372

    This PR is not intended to be directly merged, but to open a discussion about this approach.

    Using this approach, the following seems to work for adding type hints:

    import asyncio
    import janus
    
    
    def threaded(sync_q: janus.SyncQueue[int]):
        for i in range(100):
            sync_q.put(i)
        sync_q.join()
    
    
    async def async_coro(async_q: janus.AsyncQueue[int]):
        for i in range(100):
            val = await async_q.get()
            assert val == i
            async_q.task_done()
    
    
    async def main():
        queue: janus.Queue[int] = janus.Queue()
        loop = asyncio.get_running_loop()
        fut = loop.run_in_executor(None, threaded, queue.sync_q)
        await async_coro(queue.async_q)
        await fut
        queue.close()
        await queue.wait_closed()
    
    
    asyncio.run(main())
    
    

    Full disclaimer: I never used Protocols before. I skimmed https://www.python.org/dev/peps/pep-0544 and watched https://www.youtube.com/watch?v=kDDCKwP7QgQ Also, I only recently started using typing with Python.

    While this approach seems to work, it looks rather verbose.

    opened by rbuffat 3
  • What is the correct way to type annotate a janus.Queue?

    What is the correct way to type annotate a janus.Queue?

    When I try to add type hints to the example, pylance creates reportPrivateUsage warnings.

    import asyncio
    import janus
    
    
    def threaded(sync_q: janus._SyncQueueProxy[int]):
        for i in range(100):
            sync_q.put(i)
        sync_q.join()
    
    
    async def async_coro(async_q: janus._AsyncQueueProxy[int]):
        for i in range(100):
            val = await async_q.get()
            assert val == i
            async_q.task_done()
    
    
    async def main():
        queue: janus.Queue[int] = janus.Queue()
        loop = asyncio.get_running_loop()
        fut = loop.run_in_executor(None, threaded, queue.sync_q)
        await async_coro(queue.async_q)
        await fut
        queue.close()
        await queue.wait_closed()
    
    
    asyncio.run(main())
    

    Warnings:

    "_SyncQueueProxy" is private and used outside of the module in which it is declared

    "_AsyncQueueProxy" is private and used outside of the module in which it is declared

    What is the preferred way to add type hints to janus Queues?

    opened by rbuffat 3
  • Fix compatibility with Python 3.10, refs #358

    Fix compatibility with Python 3.10, refs #358

    What do these changes do?

    Tests now also run against Python 3.10, and code incorporates a workaround for a bug in Python 3.10 that affects Janus.

    Related issue number

    • #358

    Checklist

    • [x] I think the code is well written
    • [x] Unit tests for the changes exist
    • [x] Documentation reflects the changes - (not needed)
    • [ ] Add a new news fragment into the CHANGES folder (skipped, not sure how to do this)
    opened by simonw 3
  • Add CodeQL workflow

    Add CodeQL workflow

    Hi aio-libs/janus!

    This is not an automatic, 🤖-generated PR, as you can check in my GitHub profile, I work for GitHub and I am part of the GitHub Security Lab which is helping out with the migration of LGTM configurations to Code Scanning. You might have heard that we've integrated LGTM's underlying CodeQL analysis engine natively into GitHub. The result is GitHub code scanning!

    With LGTM fully integrated into code scanning, we are focused on improving CodeQL within the native GitHub code scanning experience. In order to take advantage of current and future improvements to our analysis capabilities, we suggest you enable code scanning on your repository. Please take a look at our blog post for more information.

    This pull request enables code scanning by adding an auto-generated codeql.yml workflow file for GitHub Actions to your repository — take a look! We tested it before opening this pull request, so all should be working :heavy_check_mark:. In fact, you might already have seen some alerts appear on this pull request!

    Where needed and if possible, we’ve adjusted the configuration to the needs of your particular repository. But of course, you should feel free to tweak it further! Check this page for detailed documentation.

    Questions? Check out the FAQ below!

    FAQ

    Click here to expand the FAQ section

    How often will the code scanning analysis run?

    By default, code scanning will trigger a scan with the CodeQL engine on the following events:

    • On every pull request — to flag up potential security problems for you to investigate before merging a PR.
    • On every push to your default branch and other protected branches — this keeps the analysis results on your repository’s Security tab up to date.
    • Once a week at a fixed time — to make sure you benefit from the latest updated security analysis even when no code was committed or PRs were opened.

    What will this cost?

    Nothing! The CodeQL engine will run inside GitHub Actions, making use of your unlimited free compute minutes for public repositories.

    What types of problems does CodeQL find?

    The CodeQL engine that powers GitHub code scanning is the exact same engine that powers LGTM.com. The exact set of rules has been tweaked slightly, but you should see almost exactly the same types of alerts as you were used to on LGTM.com: we’ve enabled the security-and-quality query suite for you.

    How do I upgrade my CodeQL engine?

    No need! New versions of the CodeQL analysis are constantly deployed on GitHub.com; your repository will automatically benefit from the most recently released version.

    The analysis doesn’t seem to be working

    If you get an error in GitHub Actions that indicates that CodeQL wasn’t able to analyze your code, please follow the instructions here to debug the analysis.

    How do I disable LGTM.com?

    If you have LGTM’s automatic pull request analysis enabled, then you can follow these steps to disable the LGTM pull request analysis. You don’t actually need to remove your repository from LGTM.com; it will automatically be removed in the next few months as part of the deprecation of LGTM.com (more info here).

    Which source code hosting platforms does code scanning support?

    GitHub code scanning is deeply integrated within GitHub itself. If you’d like to scan source code that is hosted elsewhere, we suggest that you create a mirror of that code on GitHub.

    How do I know this PR is legitimate?

    This PR is filed by the official LGTM.com GitHub App, in line with the deprecation timeline that was announced on the official GitHub Blog. The proposed GitHub Action workflow uses the official open source GitHub CodeQL Action. If you have any other questions or concerns, please join the discussion here in the official GitHub community!

    I have another question / how do I get in touch?

    Please join the discussion here to ask further questions and send us suggestions!

    opened by Kwstubbs 0
  • No running event loop

    No running event loop

    I am having trouble creating the janus Queue outside of asyncio. It requires me to make dummy background tasks to just create the object, since not running loop exists I am wonder if this could be lazy or allow the loop to be passed in the constructor? It makes sync to async code a tad harder.

    opened by aaronclong 0
  • Added the possibility of synchronous initialization

    Added the possibility of synchronous initialization

    What do these changes do?

    Prior to this commit, you could only initialize a queue in an asynchronous function. Now it is possible in synchronous, but you need to pass the event loop (even empty\not running) to the constructor

    Just a new way to init janus. But gives more ways to handle exceptions and code manage.

    (Minimum code to understand the changes)

    Before

    import asyncio
    import janus
    
    def async_f(async_q: janus.AsyncQueue[int]):
        ...
    def sync_f(sync_q: janus.SyncQueue[int]):
        ...
    async def main() -> None:
        loop = asyncio.get_running_loop()
        queue: janus.Queue[int] = janus.Queue()
    
        loop.run_in_executor(None, sync_f, queue.sync_q)
        await async_f(queue.async_q)
        ...
    
    if __name__ == "__main__":
        asyncio.run(main())
    

    Now

    import asyncio
    import janus
    
    def async_f(async_q: janus.AsyncQueue[int]):
        ...
    def sync_f(sync_q: janus.SyncQueue[int]):
        ...
    
    if __name__ == "__main__":
        loop = asyncio.get_event_loop()
        queue: janus.Queue[int] = janus.Queue(loop=loop)
        loop.create_task(async_f(queue.async_q))
        loop.run_in_executor(None, sync_f, queue.sync_q)
        try:
             loop.run_forever()
        except KeyboardInterrupt:
             pass
    
    

    Are there changes in behavior for the user?

    There are no behavior changes for users.

    Checklist

    • [X] I think the code is well written
    • [ ] Unit tests for the changes exist
    • [ ] Documentation reflects the changes
    • [ ] Add a new news fragment into the CHANGES folder
      • name it <issue_id>.<type> (e.g. 588.bugfix)
      • if you don't have an issue_id change it to the pr id after creating the PR
      • ensure type is one of the following:
        • .feature: Signifying a new feature.
        • .bugfix: Signifying a bug fix.
        • .doc: Signifying a documentation improvement.
        • .removal: Signifying a deprecation or removal of public API.
        • .misc: A ticket has been closed, but it is not of interest to users.
      • Make sure to use full sentences with correct case and punctuation, for example: Fix issue with non-ascii contents in doctest text files.
    opened by s0d3s 0
  • Actually include LICENSE file

    Actually include LICENSE file

    What do these changes do?

    License file is missing from distribution. This change fixes an issue.

    Are there changes in behavior for the user?

    Yes. Users can read license file, packagers shouldn't add hacks to package license file.

    Related issue number

    No issues.

    Checklist

    • [x] I think the code is well written
    • [x] Unit tests for the changes exist
    • [x] Documentation reflects the changes
    • [x] Add a new news fragment into the CHANGES folder
      • name it <issue_id>.<type> (e.g. 588.bugfix)
      • if you don't have an issue_id change it to the pr id after creating the PR
      • ensure type is one of the following:
        • .feature: Signifying a new feature.
        • .bugfix: Signifying a bug fix.
        • .doc: Signifying a documentation improvement.
        • .removal: Signifying a deprecation or removal of public API.
        • .misc: A ticket has been closed, but it is not of interest to users.
      • Make sure to use full sentences with correct case and punctuation, for example: Fix issue with non-ascii contents in doctest text files.

    (I've checked all checkboxes, however they are not applicable, and there is no CHANGES folder in this repository)

    opened by rominf 0
  • Double-ended Queue Support

    Double-ended Queue Support

    Hey! Recently I found your library and it's really helping me with the communication between sub thread coroutines and main thread functions. One of my use cases needs the ability to clear all the already exiting messages in a queue, or to insert the new message to the end of the queue. That ability is not possible when using a normal queue, so I wanted to know if there is any planning for a double-ended queue support any time soon. Thanks!

    opened by ronigober 0
  • Performance benefits?

    Performance benefits?

    So I've been testing the performance a bit, varying e.g.

    • 1:1, 16:1, 64:1 producer : consumer ratio
    • normal/lifo/priority queue etc
    • asyncio/uvloop event loop
    • async -> async, async -> sync, sync -> sync
    • (probably more)

    I found that, janus queues are ~5x slower in sync->sync, ~9x slower in sync->async, and ~15x slower in async->async. This is pretty much consistent across all parameter sets.

    This confirmed my suspicion that the performance gain of parallel computation is often less than the cost of using e.g. threading.Lock a lot (the GIL certainly doesn't help either).

    Right now, I can imagine that many users have incorrect expectations of janus. To avoid this, you could add an example that shows how janus can outperform single-threaded asyncio, by employing multiple threads. Additionally, a caveat about janus' performance would be helpful.

    opened by jorenham 1
Releases(v1.0.0)
Owner
aio-libs
The set of asyncio-based libraries built with high quality
aio-libs
Raise asynchronous exceptions in other thread, control the timeout of blocks or callables with a context manager or a decorator

stopit Raise asynchronous exceptions in other threads, control the timeout of blocks or callables with two context managers and two decorators. Attent

Gilles Lenfant 97 Oct 12, 2022
Thread-safe asyncio-aware queue for Python

Mixed sync-async queue, supposed to be used for communicating between classic synchronous (threaded) code and asynchronous

aio-libs 665 Jan 03, 2023
SCOOP (Scalable COncurrent Operations in Python)

SCOOP (Scalable COncurrent Operations in Python) is a distributed task module allowing concurrent parallel programming on various environments, from h

Yannick Hold 573 Dec 27, 2022
AnyIO is an asynchronous networking and concurrency library that works on top of either asyncio or trio.

AnyIO is an asynchronous networking and concurrency library that works on top of either asyncio or trio. It implements trio-like structured concurrenc

Alex Grönholm 1.1k Jan 06, 2023
Python Multithreading without GIL

Multithreaded Python without the GIL

Sam Gross 2.3k Jan 05, 2023
Jug: A Task-Based Parallelization Framework

Jug: A Task-Based Parallelization Framework Jug allows you to write code that is broken up into tasks and run different tasks on different processors.

Luis Pedro Coelho 387 Dec 21, 2022
A lightweight (serverless) native python parallel processing framework based on simple decorators and call graphs.

A lightweight (serverless) native python parallel processing framework based on simple decorators and call graphs, supporting both control flow and dataflow execution paradigms as well as de-centrali

102 Jan 06, 2023
Unsynchronize asyncio by using an ambient event loop, or executing in separate threads or processes.

unsync Unsynchronize asyncio by using an ambient event loop, or executing in separate threads or processes. Quick Overview Functions marked with the @

Alex Sherman 802 Dec 28, 2022
A Python package for easy multiprocessing, but faster than multiprocessing

MPIRE, short for MultiProcessing Is Really Easy, is a Python package for multiprocessing, but faster and more user-friendly than the default multiprocessing package.

753 Dec 29, 2022
aiomisc - miscellaneous utils for asyncio

aiomisc - miscellaneous utils for asyncio Miscellaneous utils for asyncio. The complete documentation is available in the following languages: English

aiokitchen 295 Jan 08, 2023
rosny is a lightweight library for building concurrent systems.

rosny is a lightweight library for building concurrent systems. Installation Tested on: Linux Python = 3.6 From pip: pip install rosny From source: p

Ruslan Baikulov 6 Oct 05, 2021
A concurrent sync tool which works with multiple sources and targets.

Concurrent Sync A concurrent sync tool which works similar to rsync. It supports syncing given sources with multiple targets concurrently. Requirement

Halit Şimşek 2 Jan 11, 2022
A curated list of awesome Python asyncio frameworks, libraries, software and resources

Awesome asyncio A carefully curated list of awesome Python asyncio frameworks, libraries, software and resources. The Python asyncio module introduced

Timo Furrer 3.8k Jan 08, 2023
Simple package to enhance Python's concurrent.futures for memory efficiency

future-map is a Python library to use together with the official concurrent.futures module.

Arai Hiroki 2 Nov 15, 2022
Trio – a friendly Python library for async concurrency and I/O

Trio – a friendly Python library for async concurrency and I/O The Trio project aims to produce a production-quality, permissively licensed, async/awa

5k Jan 07, 2023
Parallelformers: An Efficient Model Parallelization Toolkit for Deployment

Parallelformers: An Efficient Model Parallelization Toolkit for Deployment

TUNiB 559 Dec 28, 2022
🌀 Pykka makes it easier to build concurrent applications.

🌀 Pykka Pykka makes it easier to build concurrent applications. Pykka is a Python implementation of the actor model. The actor model introduces some

Stein Magnus Jodal 1.1k Dec 30, 2022
Ultra fast asyncio event loop.

uvloop is a fast, drop-in replacement of the built-in asyncio event loop. uvloop is implemented in Cython and uses libuv under the hood. The project d

magicstack 9.1k Jan 07, 2023
Backport of the concurrent.futures package to Python 2.6 and 2.7

This is a backport of the concurrent.futures standard library module to Python 2. It does not work on Python 3 due to Python 2 syntax being used in th

Alex Grönholm 224 Nov 07, 2022