A fast and reliable background task processing library for Python 3.

Overview

dramatiq

Build Status PyPI version Documentation Discuss

A fast and reliable distributed task processing library for Python 3.


Changelog: https://dramatiq.io/changelog.html
Community: https://groups.io/g/dramatiq-users
Documentation: https://dramatiq.io


Sponsors

Installation

If you want to use it with RabbitMQ

pip install 'dramatiq[rabbitmq, watch]'

or if you want to use it with Redis

pip install 'dramatiq[redis, watch]'

Quickstart

Make sure you've got RabbitMQ running, then create a new file called example.py:

import dramatiq
import requests
import sys


@dramatiq.actor
def count_words(url):
    response = requests.get(url)
    count = len(response.text.split(" "))
    print(f"There are {count} words at {url!r}.")


if __name__ == "__main__":
    count_words.send(sys.argv[1])

In one terminal, run your workers:

dramatiq example

In another, start enqueueing messages:

python example.py http://example.com
python example.py https://github.com
python example.py https://news.ycombinator.com

Check out the user guide to learn more!

License

dramatiq is licensed under the LGPL. Please see COPYING and COPYING.LESSER for licensing details.

Comments
  • Class-based actor

    Class-based actor

    Hello,

    I've been looking for a celery alternative for quite a while, thanks for taking the time to make this.

    A suggestion i'd like to make is to add the ability of defining class-based actor where grouping common functionality among different "tasks" or "actors" is possible.

    Thank you.

    enhancement 
    opened by rakanalh 16
  • `ResultTimeout` with Redis

    `ResultTimeout` with Redis

    Issues

    GitHub issues are for bugs. If you have questions, please ask them on the discussion board.

    Checklist

    • [x] Does your title concisely summarize the problem?
    • [x] Did you include a minimal, reproducible example?
    • [x] What OS are you using?
    • [x] What version of Dramatiq are you using?
    • [x] What did you do?
    • [x] What did you expect would happen?
    • [x] What happened?

    What OS are you using?

    CentOs7

    What version of Dramatiq are you using?

    1.5.0

    What did you do?

    called function completes and returns a list successfully ( I have chekd this with print statement, and it prints correctly whenever I call the function asynchronously ), but when I try to retrieve the retruned result with tacotron.send(model).get_result(backend = result_backend, block=True), I get:

    Traceback (most recent call last):
      File "actor.py", line 55, in <module>
        ret = tacotron.send().get_result(backend = result_backend, block=True)
      File "/home/msl/.virtualenvs/venv_tts/lib/python3.5/site-packages/dramatiq-1.5.0-py3.5.egg/dramatiq/message.py", line 147, in get_result
        return backend.get_result(self, block=block, timeout=timeout)
      File "/home/msl/.virtualenvs/venv_tts/lib/python3.5/site-packages/dramatiq-1.5.0-py3.5.egg/dramatiq/results/backends/redis.py", line 81, in get_result
        raise ResultTimeout(message)
    dramatiq.results.errors.ResultTimeout: tacotron()
    

    this is how I defined tacotron(model):

    @dramatiq.actor
    def tacotron(model):
        return model.GetMel("President Trump met with other leaders at the Group of 20 conference.", 0)
    

    I have put print statement inside GetMel of model, and it prints correctly -- i.e. right before it returns, I check the content of the list that is being returned, and they are fine. It prints within 1 second of the request.

    Result backend: Redis I'm not sure if this is correct. How I installed and started Redis:

    sudo pip install redis
    sudo yum install redis
    sudo systemctl start redis
    

    Broker: RabbitMQ

    What did you expect would happen?

    for tacotron.send(model).get_result(backend = result_backend, block=True) to return successfully

    What happened?

    get_result(block=True) hung then errored

    Traceback (most recent call last):
      File "actor.py", line 55, in <module>
        ret = tacotron.send().get_result(backend = result_backend, block=True)
      File "/home/msl/.virtualenvs/venv_tts/lib/python3.5/site-packages/dramatiq-1.5.0-py3.5.egg/dramatiq/message.py", line 147, in get_result
        return backend.get_result(self, block=block, timeout=timeout)
      File "/home/msl/.virtualenvs/venv_tts/lib/python3.5/site-packages/dramatiq-1.5.0-py3.5.egg/dramatiq/results/backends/redis.py", line 81, in get_result
        raise ResultTimeout(message)
    dramatiq.results.errors.ResultTimeout: tacotron()
    
    opened by isaacleeai 15
  • fix: race condition on join() in the redis broker.

    fix: race condition on join() in the redis broker.

    This fixes #480

    I'm not sure how to test this, since the race condition is not easy to trigger. @Bogdanp do you have any suggestions?

    Also looking at the stub broker it seems that it may have a similar race condition, but I've never triggered a similar issue with it, so I'm not sure if the change (swapping the order or the delay queue and normal one) is needed https://github.com/Bogdanp/dramatiq/blob/2d0ca2519a0289946337b308e450a915cf5fd869/dramatiq/brokers/stub.py#L153-L164

    opened by CaselIT 14
  • (Error) Logging misbehaves when run in Docker

    (Error) Logging misbehaves when run in Docker

    What OS are you using?

    Debian Stretch inside Docker

    What version of Dramatiq are you using?

    1.5.0

    What did you do?

    Run dramatiq not.existing; echo $? in a Docker container.

    A simple docker run --rm -ti python:3.7-stretch /bin/bash followed by pip install dramatiq is the fastest way to get a repro.

    What did you expect would happen?

    The worker fails to boot and complains with something along the lines of ModuleNotFoundError: No module named 'not', the return code is always 2.

    When running inside an iterm2 Mac zsh shell this is exactly what happens.

    What happened?

    When running below docker most of the times the output is completely swallowed. Sometimes only [dramatiq.MainProcess] [INFO] Dramatiq '1.5.0' is booting up. is printed.

    Every 10th or so time the complete expected output can the seen.

    The exit code is oscillating between 0 and 2.

    I did not have the time yet to investigate but it seems to be related to the logging pipe of the workers. If I tweak setup_worker_logging not to pass the pipe the errors are reported as expected.

    I was running into this problem when adding Dramatiq in a Kubernetes setup. I cannot say yet if there are additional problems when the module can be successfully imported.

    needs investigation 
    opened by aberres 13
  • Handling task failures+retries during testing

    Handling task failures+retries during testing

    As someone new to dramatiq, the retry middleware may cause task failures during testing to appear as if the test worker is hanging indefinitely or is otherwise failing to execute tasks. What actually happened:

    • Some of my application task code failed during testing, causing the retry middleware to kick in.
    • The retry middleware defaults to a 7-day maximum backoff.
    • pytest tends to capture stdout/stderr, and terminating the test run causes that output to be lost.

    Given the lack of output and apparent hang, my initial reaction was confusion and to assume that I had somehow misconfigured the test worker. The reality was that everything was working correctly, and pytest was just eating the output.

    The unit testing guide should probably mention that apparent hangs may be the result of the retry middleware, and that they should try temporarily setting max_retries=0 or consider disabling the retry middleware during testing.

    enhancement 
    opened by rpkilby 12
  • Fix memory leak in dramatiq.Worker:process_message

    Fix memory leak in dramatiq.Worker:process_message

    The memory leak takes place only when an exception is thrown.

    The problem has been traced down to the exception handler of process_message and more specifically to how the exception object is handled and stored in order to be referred back to down the road, eg. for debugging purposes or by the results' middleware. When message.stuff_exception(e) is called, a cyclic reference is created, which causes objects to not be garbage-collected (at least not until the cyclic GC kicks in).

    In general, a cyclic reference is created when the exception object is stored in a variable x, which is part of the stack frame. The exception object references the stack frame, thus the variable x, via its __traceback__ attribute. But now x references the exception object, too, thus the stack frame. So, a cyclic reference is created!

    In this particular case, the reference cycle created is:

    message -> message._exception -> e -> stackframe -> message

    In general, it is not recommended to store exception objects in local vars. However, if that's necessary, such variables should always be (explicitly) cleared in order to break the cyclic reference - usually in a try/finally statement.

    In that sense, it comes in handy that a try/finally statement is already being used. By setting message._exception to None at the very end the reference cycle is broken and objects are garbage-collected soon enough.

    The following piece of code can be used to re-produce the problem:

    import dramatiq
    
    
    @dramatiq.actor
    def foo(*args, **kwargs):
        a = tuple(range(5000000))                                        
        raise Exception('bar')                                                      
    

    After running for _ in range(5): foo.send() the memory leak is evident:

    Figure_1

    When the fix is applied:

    Figure_1_no_leak

    opened by pchristos 11
  • Integration with new sentry client

    Integration with new sentry client

    opened by dima-takoy-zz 11
  • ConcurrentRateLimiter limit=1, with no retries?

    ConcurrentRateLimiter limit=1, with no retries?

    First of all - Big thanks @Bogdanp for creating a truly pythonic task processor! Celery has always given me a kind of bitter after taste..

    I'm working towards a backend system that can't handle concurrent writes to the same database table. On top of that the task writing to this system might fail half-way and then we have no way to know where to resume when the task is retried (unless I keep state in the app which I'd like to avoid to keep things simple), so I've passed max_retries=0 to dramatic.actor and made the task notify me of any failures.

    So my thought was that dramatiq.rate_limits.ConcurrentRateLimiter would be a perfect fit here. In the example in the cookbook though you're using it inside the task function. But I guess this wouldn't work in my scenario as the task will never be retried... Is there any way to have the mutex apply to the task itself so that it never even starts if another instance of that task is already executing?

    In my head an API like this would do it for my scenario:

    import dramatiq
    import time
    
    from dramatiq.rate_limits import ConcurrentRateLimiter
    from dramatiq.rate_limits.backends import RedisBackend
    
    backend = RedisBackend()
    DISTRIBUTED_MUTEX = ConcurrentRateLimiter(backend, "distributed-mutex", limit=1)
    
    @dramatiq.actor(rate_limiter=DISTRIBUTED_MUTEX, max_retries=0)
    def one_at_a_time():
      print("I'll never run concurrently! And never twice!")
    

    Or maybe I'm going all wrong with this?

    Any input would be greatly appreciated!

    enhancement question 
    opened by jacobsvante 11
  • FileNotFoundError thrown when queueing from flask app

    FileNotFoundError thrown when queueing from flask app

    What OS are you using?

    Running in default Python 3.7 Docker container (Debian)

    What version of Dramatiq are you using?

    1.4.3

    What did you do?

    Created an actor using the decorator. Then called that actor using send(). This was called from the request context in a Flask application.

    What did you expect would happen?

    Expected message to be queued in rabbitmq

    What happened?

    FileNotFoundError. Doesn't consistently happen, but periodically it throws this error.

    FileNotFoundError: [Errno 2] No such file or directory
      File "flask/app.py", line 2309, in __call__
        return self.wsgi_app(environ, start_response)
      File "appoptics_apm/middleware.py", line 142, in __call__
        result = self.wrapped_app(environ, wrapped_start_response)
      File "werkzeug/contrib/fixers.py", line 152, in __call__
        return self.app(environ, start_response)
      File "raven/middleware.py", line 100, in __call__
        iterable = self.application(environ, start_response)
      File "flask/app.py", line 2295, in wsgi_app
        response = self.handle_exception(e)
      File "flask_restful/__init__.py", line 273, in error_router
        return original_handler(e)
      File "flask/app.py", line 1741, in handle_exception
        reraise(exc_type, exc_value, tb)
      File "flask/_compat.py", line 35, in reraise
        raise value
      File "flask/app.py", line 2292, in wsgi_app
        response = self.full_dispatch_request()
      File "flask/app.py", line 1815, in full_dispatch_request
        rv = self.handle_user_exception(e)
      File "flask_restful/__init__.py", line 273, in error_router
        return original_handler(e)
      File "flask/app.py", line 1718, in handle_user_exception
        reraise(exc_type, exc_value, tb)
      File "flask/_compat.py", line 35, in reraise
        raise value
      File "flask/app.py", line 1813, in full_dispatch_request
        rv = self.dispatch_request()
      File "flask/app.py", line 1799, in dispatch_request
        return self.view_functions[rule.endpoint](**req.view_args)
      File "flask/views.py", line 88, in view
        return self.dispatch_request(*args, **kwargs)
      File "flask/views.py", line 158, in dispatch_request
        return meth(*args, **kwargs)
      File "flask_login/utils.py", line 261, in decorated_view
        return func(*args, **kwargs)
      File "sylo/permissions/access.py", line 96, in wrapped
        return f(*args, **kwargs)
      File "webargs/core.py", line 447, in wrapper
        return func(*new_args, **kwargs)
      File "<my-file>", line 47, in wrapper
        result = func(*args, **kwargs)
      File "<my-file>", line 486, in func
        <my_actor>.send(one_id, two_id)
      File "dramatiq/actor.py", line 179, in send
        return self.send_with_options(args=args, kwargs=kwargs)
      File "dramatiq/actor.py", line 198, in send_with_options
        return self.broker.enqueue(message, delay=delay)
      File "dramatiq/brokers/rabbitmq.py", line 250, in enqueue
        properties=properties,
      File "pika/adapters/blocking_connection.py", line 2206, in publish
        immediate=immediate)
      File "pika/channel.py", line 425, in basic_publish
        (properties, body))
      File "pika/channel.py", line 1328, in _send_method
        self.connection._send_method(self.channel_number, method, content)
      File "pika/connection.py", line 2341, in _send_method
        self._send_message(channel_number, method, content)
      File "pika/connection.py", line 2355, in _send_message
        self._send_frame(frame.Method(channel_number, method_frame))
      File "pika/connection.py", line 2327, in _send_frame
        self._flush_outbound()
      File "pika/adapters/base_connection.py", line 330, in _flush_outbound
        self._manage_event_state()
      File "pika/adapters/base_connection.py", line 536, in _manage_event_state
        self.event_state)
      File "pika/adapters/select_connection.py", line 446, in update_handler
        self._poller.update_handler(fileno, events)
      File "pika/adapters/select_connection.py", line 639, in update_handler
        events_to_set=events_set)
      File "pika/adapters/select_connection.py", line 1152, in _modify_fd_events
        self._poll.modify(fileno, events)
    
    needs investigation 
    opened by Nizebulous 10
  • Task routing

    Task routing

    does dramatiq has anything like celery about task routing.

    I didn't find any api to send task with queue_name, but I can build message myself to send task to specific queue.

     Message(
                queue_name="only_i_can_receive",
                actor_name="count_words",
                args=("https://www.google.com", ),
            )
    broker.enqueue(message, delay=delay)
    

    but i can't start worker only consumer for specific queue like celery.

    celery worker tasks -Q tasks.to.nodes.1
    
    enhancement 
    opened by Timaqf 10
  • actor declare on a method or classmethod

    actor declare on a method or classmethod

    Hi,

    This is not a bug but a wish, I would use the decorator actor on a method or a classmethod have you a solution ?

    exemple:

    
    class foo:
        @dramatiq.actor()
        def count_worlds(url):
              pass
    
    
    question 
    opened by jssuzanne 10
  • Run out of ports when 65000+ messages are sent via 65000+ threads

    Run out of ports when 65000+ messages are sent via 65000+ threads

    Issues

    GitHub issues are for bugs. If you have questions, please ask them on the discussion board.

    Checklist

    • [X] Does your title concisely summarize the problem?
    • [X] Did you include a minimal, reproducible example?
    • [X] What OS are you using?
    • [X] What version of Dramatiq are you using?
    • [X] What did you do?
    • [X] What did you expect would happen?
    • [X] What happened?

    What OS are you using?

    Happens both on Ubuntu 16.04 or macOS 10.13.3

    What version of Dramatiq are you using?

    1.13.0

    What did you do?

    Tried to create 65000+ threads and send one message per thread

    What did you expect would happen?

    Messages to be dispatched without any error

    What happened?

    Got an exception - OSError98/99

    opened by manjusfi 0
  • Discussion: Resque-like priority queues

    Discussion: Resque-like priority queues

    Situation

    The default behavior of Dramatiq when multiple queues are present (either all of them or a subset), is to fetch data from them using the consumer threads. This in practice means that all queues are drained equally (assuming they all have messages in them).

    In contrast, Resque, grabs messages in the order the queues are set when starting the workers. For example, given the queues low, high and default, one could start a worker with --queues=high,default,low and the queues would be drained in order. This ofc could lead to the low queue being starved if high or default always had a constant influx of new messages. To overcome this, often there would be multiple workers with different settings like --queues=low,default,high to ensure that all queues had at least one workers dedicated to them.

    When compared to dramatiq, resque allows creating dedicated workers for a given queue, that when idle will try to process jobs from other queues. Dramatiq can have dedicated workers for a queue, however, they will stay idle if that queue is empty.

    Potential Solutions

    Middleware

    Since dramatiq has good middleware support, one could write a new middleware to implement this behavior. However, it seems that there aren't any available hooks that could be used to handle this in an efficient way.

    Ideally, no tasks should be waiting in a worker, while a potential another worker could be handling it. We should also avoid taking tasks off a queue, and then adding it back, repeatedly.

    Builtin

    Changing how dramatiq works probably isn't the best approach, since it could break setups that are functional with the current behavior. We could have a setting to toggle this behavior. Something like --drain-queues-in-order. IMO this solution is less preferred, since it adds more complexity and a somewhat opinionated way of how things should work.

    Questions

    1. Is anyone else interested in this? Is this even useful or desired?
    2. Can any of the current hooks provided by the middleware architecture allow this to be implemented in a reasonable manner? Preferentially without changing dramatiq itself
    3. If not, what do we need in order do implement it the "middleware way"?

    I am trying to get an MVP working using the middleware approach, but so far nothing. Still getting familiar with dramatiq's internals

    Note: I am starting the discussion here because the reddit seems to be abandoned

    opened by h3nnn4n 0
  • feature request: Retries middleware to set max_retries as an option

    feature request: Retries middleware to set max_retries as an option

    As discussed in #425 it is sometimes useful to do some special handling using the on_failure callback after the last retry. However, at the moment, the on_failure callback does not have any way to dynamically determine if the current retry is the last.

    In other words, I think it could be useful to either pass the max_retries information in the message options or add another callback on_last_failure.

    Unless I'm missing something, I think the first option should be fairly straightforward to implement by adding a before_enqueue method to the Retries middleware

    class Retries(Middleware):
        ...
        def before_enqueue(self, broker, message, delay):
            actor = broker.get_actor(message.actor_name)
            max_retries = message.options.get("max_retries") or actor.options.get(
                "max_retries", self.max_retries
            )
            message.options["max_retries"] = self.max_retries
    

    If you think this could be a useful feature and that the proposed solution makes sense, I'd be happy to open a PR with the change.


    As a workaround I'm currently retrieving the actor in the on_failure handler

    @dramatiq.actor
    def my_callback(message_data, exception_data):
        actor = broker.get_actor(message_data["actor_name"])
        max_retries = actor.options.get("max_retries", 0)
    
        retries = message_data["options"]["retries"]
        if retries > max_retries:
            print("handling the error")
    
    opened by giuppep 0
  • New release, please!

    New release, please!

    Hi, @Bogdanp! Maybe it's time to release a new version of dramatiq? Python 3.11 was released not too long ago -- it would be great to have a version compatible with modern python 😎 Of course, we can install a package with reference to a specific commit, but it's still better and more convinient to get dramatiq from PyPi, for example, v1.14.0.

    opened by anton-petrov 0
  • Priority doesn't seem to behave as expected

    Priority doesn't seem to behave as expected

    Issues

    Note -- I'm running Django-dramatiq

    I need to use dramatiq to execute higher-priority tasks before lower-priority tasks, and I have a fairly long queue of tasks (maybe a few thousand running on only a 6 to 8 workers). However, I have very few high-priority tasks, and I'd expect those to always be executed first. But, the behavior I observe is that dramatiq will keep choosing to run enqueued actors with lower priorities (sometimes).

    As far as I understood from reading a bunch of issues what I'd actually need here is "queue priority" or some such, which is only supported by RabbitMQ. But I'd be nice to double-check and see I'm not doing something wrong, and also, if priority is indeed limited to a specific situation (e.g. priority of resource allocation to already running tasks) I'd be nice if the documentation was more specific about this.

    Checklist

    • [x] Does your title concisely summarize the problem?
    • [ ] Did you include a minimal, reproducible example?
    • [x] What OS are you using?
    • [x] What version of Dramatiq are you using?
    • [x] What did you do?
    • [x] What did you expect would happen?
    • [x] What happened?

    What OS are you using?

    Ubuntu 22.04 LTS

    What version of Dramatiq are you using?

    1.13.0

    What did you do?

    I need to use dramatiq to execute higher-priority tasks before lower-priority tasks, and I have a fairly long queue of tasks (maybe a few thousand running on only a 6 to 8 workers). However, I only enqueued a few high-priority tasks

    What did you expect would happen?

    I have very few high-priority tasks, and I'd expect those to always be executed first.

    What happened?

    The behavior I observe is that dramatiq will keep choosing to run enqueued actors with lower priorities (sometimes).

    opened by George3d6 0
  • RabbitMQ broker doesn't seem to retry publishing on SSL socket errors

    RabbitMQ broker doesn't seem to retry publishing on SSL socket errors

    Issues

    GitHub issues are for bugs. If you have questions, please ask them on the discussion board.

    Checklist

    • [x] Does your title concisely summarize the problem?
    • [x] Did you include a minimal, reproducible example?
    • [x] What OS are you using?
    • [x] What version of Dramatiq are you using?
    • [x] What did you do?
    • [x] What did you expect would happen?
    • [x] What happened?

    What OS are you using?

    Ubuntu 20.04 and MacOS 12.6

    What version of Dramatiq are you using?

    Dramatiq 1.1.13

    What did you do?

    Enqueue messages over a long running process.

    What did you expect would happen?

    Messages should be published reliably.

    What happened?

    We've recently migrated to a RabbitMQ deployment on AmazonMQ and connect over SSL (where as previously it was plain text). We're seeing the following error occasionally:

    Socket EOF; <ssl.SSLSocket fd=32, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('x.x.x.x', 40536)>
    

    We have a Django and Dramatiq deployment where both the Django and Dramatiq workers can publish messages. We've seen StreamLostErrors appear in our logs on long running processes, but have filtered them out assuming that Dramatiq's underlying retry mechanism will reopen the connection based on this Github Issue: https://github.com/Bogdanp/django_dramatiq/issues/85

    I believe this is where pika is raising the SSL error: https://github.com/pika/pika/blob/main/pika/adapters/utils/io_services_utils.py#L794-L797

    Browsing through the code, it looks like Dramatiq doesn't handle the SSL error, only subclasses of AMQPConnectionError or AMQPChannelError: https://github.com/Bogdanp/dramatiq/blob/master/dramatiq/brokers/rabbitmq.py#L341-L354

    opened by shshe 0
Releases(v1.13.0)
  • v1.13.0(Jun 17, 2022)

    Note: this version was released on 2022-04-02. I am just now backfilling a GitHub Release for it.

    Fixed

    Changed

    • Typing support has been improved. (#482, @staticdev)
    • The default broker now falls back to Redis if the RabbitMQ extras are not installed, in an attempt to make the getting started process easier. (#483, #486, @kurtmckee)
    Source code(tar.gz)
    Source code(zip)
  • v1.12.3(Jan 16, 2022)

  • v1.12.2(Jan 14, 2022)

  • v1.12.1(Dec 19, 2021)

    Fixed

    • Actors and messages can now specify 0 backoff. (@FinnLidbetter, #438)
    • An issue where Redis message ids could be put back onto the queue after ack/nack. (#442, #444)

    Removed

    • Dropped Python 3.5 support as it reached end-of-life.
    Source code(tar.gz)
    Source code(zip)
  • v1.12.0(Oct 23, 2021)

    Added

    • RabbitMQ messages now have a redelivered flag. (#405, @nffdiogosilva)
    • Time limits now work under gevent. (#408, @FinnLidbetter)
    • Shutdown notifications now work under gevent. (#426, @FinnLidbetter)

    Changed

    • The watchdog library is no longer being pinned to a specific version. (#428)
    • The Redis broker now limits unpacks to half the size of the Lua stack. (#433, #434, @ethervoid)

    Fixed

    • Async exceptions now correctly set the thread id on Python 3.7 and up. (#419, #420, @FinnLidbetter)
    Source code(tar.gz)
    Source code(zip)
  • v1.11.0(May 22, 2021)

    Added

    • dramatiq.Message.decode now raises a dramatiq.DecodeError on exception. (#375, @thomazthz)

    Changed

    • The RabbitMQ broker moves messages that fail to decode to the DLQ. (#375, @thomazthz)

    Fixed

    • The Redis broker is now more defensive in how it handles re-enqueueing messages. This should fix a potential race condition where a worker could hit its heartbeat timeout but still end up re-enqueueing messages (thus ending up with the same message id enqueued multiple times). (#266, #395)
    • A code path that could lead to an unbound variable error has now been fixed. (#382)
    • Deleting the connection off of a RabbitMQ broker now correctly closes it and its associated channel (if any) before removing it from the broker. (#381, #384)
    Source code(tar.gz)
    Source code(zip)
  • v1.10.0(Dec 21, 2020)

    Added

    • The RabbitMQ broker dead message TTL can now be configured via the dramatiq_dead_message_ttl environment variable. (#354, @evstratbg)
    • The CLI now supports referencing a callable to set up the broker on worker startup. (#350)
    • The --worker-shutdown-timeout flag. (#330, @mic47)

    Changed

    • The CLI raises an error when the --watch flag is set on unsupported platforms. (#326, #328, @CaselIT)

    Fixed

    • The CLI now returns code 1 when one of the workers is killed by an unhandled signal. (#334, @omegacoleman)
    • The results middleware now gracefully handles actor-not-found errors during nack. (#336, #337, @AndreCimander)
    • A memory bloat issue with tasks that raise exceptions. (#351)
    • CI on Windows. (#371, @gdvalle)
    Source code(tar.gz)
    Source code(zip)
  • v1.9.0(Oct 18, 2020)

    Added

    • A custom Redis connection can now be passed to the Redis broker via the new client keyword argument. (#274, @davidt99)
    • Message priority can now be changed in before_enqueue hooks. (#313, @thomazthz)
    • Support for storing actor exceptions. (#156)
    • Support for silent Retries. (#295)
    • Support for expected exceptions via the throws actor option. (#303, @takhs91)
    • Support for changing the consumer class in the RabbitMQ and Redis brokers. (#316, @AndreCimander)

    Changed

    • Worker processes are no longer daemons. (#289, #294, @takhs91)

    Fixed

    • A race condition during command line startup where the wrong exit codes could be returned when subprocesses failed. (#286)
    • A race condition between worker processes and fork processes during boot. (#297)
    • A logging race condition on Linux. (#171, #286)
    • fileno has been added to StreamablePipe. (#291, @takhs91)
    Source code(tar.gz)
    Source code(zip)
  • v1.8.1(Oct 18, 2020)

  • v1.8.0(Oct 18, 2020)

    Added

    • Support for forking and running arbitrary functions (so-called “fork functions”). (#127, #230)
    • The --fork-function flag.
    • The --skip-logging flag. (#263, @whalesalad)

    Fixed

    • An issue where the max_age parameter to Callbacks was being ignored. (#240, @evstratbg)
    • An issue with delaying pipelines. (#264, @synweap15)
    • An issue where the master process would sometimes hang when stopped. (#260, @asavoy)
    • An issue where the RedisBroker could sometimes prefetch more messages than it was configured to. (#262, @benekastah)
    • The StubBroker now flushes its dead letter queue when its flush_all method is called. (#247, @CapedHero)
    • The RedisBroker now takes the max lua stack size into account. This should fix certain heisenbugs that folks have encountered with that broker. (#259, @benekastah)
    Source code(tar.gz)
    Source code(zip)
  • v1.7.0(Sep 22, 2019)

    Added

    • Generic actors can now be passed custom actor registries. (#223, @jonathanlintott)
    • --use-spawn command line argument. (#227, #228, @jrusso1020)

    Changed

    • Uncaught exceptions within workers are now logged as errors rather than warnings. (#221, @th0th)
    Source code(tar.gz)
    Source code(zip)
  • v1.6.1(Sep 22, 2019)

    Added

    • RabbitmqBroker now supports multiple connection uris to be passed in via its url parameter. (#216, @wsantos)

    Changed

    • Updated allowed version range for prometheus-client. (#219, @robinro)
    Source code(tar.gz)
    Source code(zip)
  • v1.6.0(May 2, 2019)

    Added

    • dramatiq_queue_prefetch environment variable to control the number of messages to prefetch per worker thread. (#183, #184, @xelhark)
    • The RabbitMQ broker now retries the queue declaration process if an error occurs. (#179, @davidt99)
    • Support for accessing nested broker instances from the CLI. (#191, @bersace)
    • Support for eagerly raising actor exceptions in the joining thread with the StubBroker. (#195, #203)
    • Support for accessing the current message from an actor via CurrentMessage. (#208)

    Changed

    • Pinned pika version >1.0,<2.0. (#202)

    Fixed

    • An issue where workers would fail and never recover after the connection to Redis was severed. (#207)
    • pipe_ignore has been fixed to apply to the next message in line within a pipeline. (#194, @metheoryt)
    Source code(tar.gz)
    Source code(zip)
    dramatiq-1.6.0-py3-none-any.whl(100.83 KB)
    dramatiq-1.6.0.tar.gz(61.63 KB)
  • v1.5.0(Feb 18, 2019)

    Added

    • The RabbitMQ broker now supports native message priorities. (#157, @davidt99)
    • Support for specifying the actor class to @actor. (#169, @gilbsgilbs)

    Changed

    • Pika 0.13 is now required.

    Fixed

    • Consumers are now stopped after workers finish running their tasks. (#160, @brownan)
    • Worker logging on Python 3.7 is no longer delayed.
    Source code(tar.gz)
    Source code(zip)
  • v1.4.3(Jan 8, 2019)

  • v1.4.2(Jan 8, 2019)

    This release was removed from PyPI because it had a bad license classifier (GPLv3+ instead of LGPLv3+) due to a mistake on my part.

    Fixed

    • License classifier in PyPI package. There were no source code changes for this release.
    Source code(tar.gz)
    Source code(zip)
  • v1.4.1(Dec 30, 2018)

    Added

    • Support for redis-py 3.x. (#142, @maerteijn)

    Fixed

    • Workers wait for RMQ messages to be acked upon shutdown. (#148)
    • Pipelines no longer continue when a message is failed. (#151, @davidt99)
    • Log files now work under Windows. (#141, @ryansm1)
    Source code(tar.gz)
    Source code(zip)
  • v1.4.0(Nov 25, 2018)

    This is a big one!

    Added

    • Barriers.

    Changed

    • cli.main now takes an optional argument namespace so that users may define their own entrypoints. (#140, @maerteijn)
    • Actor "message received" and "completed in x ms" log messages are now logged with the DEBUG level instead of INFO level. This improves throughput and makes logging much less verbose.
    • The TimeLimit middleware no longer uses signals to trigger time limit handling. Instead it uses a background thread per worker process.
    • Dramatiq now shuts itself down if any of the workers die unexpectedly (for example, if one of them is killed by the OOM killer).
    • Windows is now supported (with some caveats)! (#119, @ryansm1)

    Fixed

    • Allow pipe_ignore option to be set at the actor level. (#100)
    • Result encoder now defaults to the global encoder. (#108, @xdmiodz)
    • Dot characters are now allowed in queue names. (#111)
    • Tests are now run on Windows. (#113, @ryansm1)
    Source code(tar.gz)
    Source code(zip)
  • v1.3.0(Jul 5, 2018)

    Changed

    • Upgraded prometheus_client to 0.2.x.
    • Bumped pika to version 0.12. Because of this change, the interrupt method on Broker and its usages within Worker have been dropped.
    • There is no longer a max message delay.

    Fixed

    • Brokers can now be passed an empty list of middleware. (#90)
    • Potential stack overflow when restarting Consumer threads. (#89)
    Source code(tar.gz)
    Source code(zip)
  • v1.2.0(Jun 1, 2018)

    Added

    • Support for worker heartbeats to RedisBroker.
    • maintenance_chance and heartbeat_timeout parameters to RedisBroker.
    • Interrupt base class for thread-interrupting exceptions. (@rpkilby)
    • ShutdownNotifications middleware. (@rpkilby)

    Changed

    • TimeLimitExceeded is now a subclass of Interrupt.

    Fixed

    • StubBroker.join and Worker.join are now more reliable.
    • Module import path is now prepended to search path rather than appended. This fixes an issue where importing modules with the same name as modules from site-packages would end up importing the modules from site-packages. (#88)
    • Prometheus middleware no longer wipes the prometheus data directory on startup. This fixes an issue with exporting application metrics along with worker metrics.
    Source code(tar.gz)
    Source code(zip)
Asynchronous tasks in Python with Celery + RabbitMQ + Redis

python-asynchronous-tasks Setup & Installation Create a virtual environment and install the dependencies: $ python -m venv venv $ source env/bin/activ

Valon Januzaj 40 Dec 03, 2022
RQ (Redis Queue) integration for Flask applications

Flask-RQ RQ (Redis Queue) integration for Flask applications Resources Documentation Issue Tracker Code Development Version Installation $ pip install

Matt Wright 205 Nov 06, 2022
Py_extract is a simple, light-weight python library to handle some extraction tasks using less lines of code

py_extract Py_extract is a simple, light-weight python library to handle some extraction tasks using less lines of code. Still in Development Stage! I

I'm Not A Bot #Left_TG 7 Nov 07, 2021
A Django app that integrates with Dramatiq.

django_dramatiq django_dramatiq is a Django app that integrates with Dramatiq. Requirements Django 1.11+ Dramatiq 0.18+ Example You can find an exampl

Bogdan Popa 261 Dec 25, 2022
Queuing with django celery and rabbitmq

queuing-with-django-celery-and-rabbitmq Install Python 3.6 or above sudo apt-get install python3.6 Install RabbitMQ sudo apt-get install rabbitmq-ser

1 Dec 22, 2021
A multiprocessing distributed task queue for Django

A multiprocessing distributed task queue for Django Features Multiprocessing worker pool Asynchronous tasks Scheduled, cron and repeated tasks Signed

Ilan Steemers 1.7k Jan 03, 2023
Full featured redis cache backend for Django.

Redis cache backend for Django This is a Jazzband project. By contributing you agree to abide by the Contributor Code of Conduct and follow the guidel

Jazzband 2.5k Jan 03, 2023
Django email backend with AWS SES and Celery

Django Celery SES Django Email Backend with Amazon Web Service SES and Celery, developed and used by StreetVoice. This packages provide a EmailBackend

StreetVoice 30 Oct 24, 2022
Dagon - An Asynchronous Task Graph Execution Engine

Dagon - An Asynchronous Task Graph Execution Engine Dagon is a job execution sys

8 Nov 17, 2022
Simple job queues for Python

Hypothesis Hypothesis is a family of testing libraries which let you write tests parametrized by a source of examples. A Hypothesis implementation the

RQ 8.7k Jan 07, 2023
a little task queue for python

a lightweight alternative. huey is: a task queue (2019-04-01: version 2.0 released) written in python (2.7+, 3.4+) clean and simple API redis, sqlite,

Charles Leifer 4.3k Jan 08, 2023
FastAPI with Celery

Minimal example utilizing fastapi and celery with RabbitMQ for task queue, Redis for celery backend and flower for monitoring the celery tasks.

Grega Vrbančič 371 Jan 01, 2023
A simple app that provides django integration for RQ (Redis Queue)

Django-RQ Django integration with RQ, a Redis based Python queuing library. Django-RQ is a simple app that allows you to configure your queues in djan

RQ 1.6k Dec 28, 2022
Mr. Queue - A distributed worker task queue in Python using Redis & gevent

MRQ MRQ is a distributed task queue for python built on top of mongo, redis and gevent. Full documentation is available on readthedocs Why? MRQ is an

Pricing Assistant 871 Dec 25, 2022
Pyramid configuration with celery integration. Allows you to use pyramid .ini files to configure celery and have your pyramid configuration inside celery tasks.

Getting Started Include pyramid_celery either by setting your includes in your .ini, or by calling config.include('pyramid_celery'): pyramid.includes

John Anderson 102 Dec 02, 2022
OpenQueue is a experimental CS: GO match system written in asyncio python.

What is OpenQueue OpenQueue is a experimental CS: GO match system written in asyncio python. Please star! This project was a lot of work & still has a

OpenQueue 10 May 13, 2022
SAQ (Simple Async Queue) is a simple and performant job queueing framework built on top of asyncio and redis

SAQ SAQ (Simple Async Queue) is a simple and performant job queueing framework built on top of asyncio and redis. It can be used for processing backgr

Toby Mao 117 Dec 30, 2022
Clearly see and debug your celery cluster in real time!

Clearly see and debug your celery cluster in real time! Do you use celery, and monitor your tasks with flower? You'll probably like Clearly! 👍 Clearl

Rogério Sampaio de Almeida 364 Jan 02, 2023
Sync Laravel queue with Python. Provides an interface for communication between Laravel and Python.

Python Laravel Queue Queue sync between Python and Laravel using Redis driver. You can process jobs dispatched from Laravel in Python. NOTE: This pack

Sinan Bekar 3 Oct 01, 2022
PostgreSQL-based Task Queue for Python

Procrastinate: PostgreSQL-based Task Queue for Python Procrastinate is an open-source Python 3.7+ distributed task processing library, leveraging Post

Procrastinate 486 Jan 08, 2023