SAQ (Simple Async Queue) is a simple and performant job queueing framework built on top of asyncio and redis

Overview

SAQ

SAQ (Simple Async Queue) is a simple and performant job queueing framework built on top of asyncio and redis. It can be used for processing background jobs with workers. For example, you could use SAQ to schedule emails, execute long queries, or do expensive data analysis.

It uses aioredis >= 2.0.

It is similar to RQ and heavily inspired by ARQ. Unlike RQ, it is async and thus significantly faster if your jobs are async. Even if they are not, SAQ is still considerably faster due to lower overhead.

SAQ optionally comes with a simple UI for monitor workers and jobs.

SAQ Web UI

Install

# minimal install
pip install saq

# web + hiredis
pip install saq[web,hiredis]

Usage

usage: saq [-h] [--workers WORKERS] [--verbose] [--web] settings

Start Simple Async Queue Worker

positional arguments:
  settings           Namespaced variable containing worker settings eg: eg module_a.settings

options:
  -h, --help         show this help message and exit
  --workers WORKERS  Number of worker processes
  --verbose, -v      Logging level: 0: ERROR, 1: INFO, 2: DEBUG
  --web              Start web app

Example

import asyncio

from saq import Queue

# all functions take in context dict and kwargs
async def test(ctx, *, a):
    await asyncio.sleep(0.5)
    # result should be json serializable
    # custom serializers and deserializers can be used through Queue(dump=,load=)
    return {"x": a}

async def startup(ctx):
    await ctx["db"] = create_db()

async def shutdown(ctx):
    await ctx["db"].disconnect()

async def before_process(ctx):
    print(ctx["job"], ctx["db"])

async def after_process(ctx):
    pass

queue = Queue.from_url("redis://localhost")

settings = {
    "queue": queue,
    "functions": [test],
    "concurrency": 10,
    "startup": startup,
    "shutdown": shutdown,
    "before_process": before_process,
    "after_process": after_process,
}

To start the worker, assuming the previous is available in the python path

saq module.file.settings

To enqueue jobs

# schedule a job normally
job = await queue.enqueue("test", a=1)

# wait 1 second for the job to complete
await job.refresh(1)
print(job.results)

# schedule a job in 10 seconds
await queue.enqueue("test", a=1, scheduled=time.time() + 10)

Demo

Start the worker

saq examples.simple.settings --web

Navigate to the web ui

Enqueue jobs

python examples/simple.py

Comparison to ARQ

SAQ is heavily inspired by ARQ but has several enhancements.

  1. Avoids polling by leveraging BLMOVE or RPOPLPUSH and NOTIFY
    1. SAQ has much lower latency than ARQ, with delays of < 5ms. ARQ's default polling frequency is 0.5 seconds
    2. SAQ is up to 8x faster than ARQ
  2. Web interface for monitoring queues and workers
  3. Heartbeat monitor for abandoned jobs
  4. More robust failure handling
    1. Storage of stack traces
    2. Sweeping stuck jobs
    3. Handling of cancelled jobs different from failed jobs (machine redeployments)
  5. Before and after job hooks
  6. Easily run multiple workers to leverage more cores

Development

python -m venv env
source env/bin/activate
pip install -e .[dev,web]
docker run -p 6379:6379 redis
./run_checks.sh
Owner
Toby Mao
Toby Mao
PostgreSQL-based Task Queue for Python

Procrastinate: PostgreSQL-based Task Queue for Python Procrastinate is an open-source Python 3.7+ distributed task processing library, leveraging Post

Procrastinate 486 Jan 08, 2023
Clearly see and debug your celery cluster in real time!

Clearly see and debug your celery cluster in real time! Do you use celery, and monitor your tasks with flower? You'll probably like Clearly! 👍 Clearl

Rogério Sampaio de Almeida 364 Jan 02, 2023
Asynchronous tasks in Python with Celery + RabbitMQ + Redis

python-asynchronous-tasks Setup & Installation Create a virtual environment and install the dependencies: $ python -m venv venv $ source env/bin/activ

Valon Januzaj 40 Dec 03, 2022
Queuing with django celery and rabbitmq

queuing-with-django-celery-and-rabbitmq Install Python 3.6 or above sudo apt-get install python3.6 Install RabbitMQ sudo apt-get install rabbitmq-ser

1 Dec 22, 2021
Sync Laravel queue with Python. Provides an interface for communication between Laravel and Python.

Python Laravel Queue Queue sync between Python and Laravel using Redis driver. You can process jobs dispatched from Laravel in Python. NOTE: This pack

Sinan Bekar 3 Oct 01, 2022
A fully-featured e-commerce application powered by Django

kobbyshop - Django Ecommerce App A fully featured e-commerce application powered by Django. Sections Project Description Features Technology Setup Scr

Kwabena Yeboah 2 Feb 15, 2022
Asynchronous serverless task queue with timed leasing of tasks

Asynchronous serverless task queue with timed leasing of tasks. Threaded implementations for SQS and local filesystem.

24 Dec 14, 2022
Add you own metrics to your celery backend

Add you own metrics to your celery backend

Gandi 1 Dec 16, 2022
A fast and reliable background task processing library for Python 3.

dramatiq A fast and reliable distributed task processing library for Python 3. Changelog: https://dramatiq.io/changelog.html Community: https://groups

Bogdan Popa 3.4k Jan 01, 2023
RQ (Redis Queue) integration for Flask applications

Flask-RQ RQ (Redis Queue) integration for Flask applications Resources Documentation Issue Tracker Code Development Version Installation $ pip install

Matt Wright 205 Nov 06, 2022
Pyramid configuration with celery integration. Allows you to use pyramid .ini files to configure celery and have your pyramid configuration inside celery tasks.

Getting Started Include pyramid_celery either by setting your includes in your .ini, or by calling config.include('pyramid_celery'): pyramid.includes

John Anderson 102 Dec 02, 2022
FastAPI with Celery

Minimal example utilizing fastapi and celery with RabbitMQ for task queue, Redis for celery backend and flower for monitoring the celery tasks.

Grega Vrbančič 371 Jan 01, 2023
Mr. Queue - A distributed worker task queue in Python using Redis & gevent

MRQ MRQ is a distributed task queue for python built on top of mongo, redis and gevent. Full documentation is available on readthedocs Why? MRQ is an

Pricing Assistant 871 Dec 25, 2022
Beatserver, a periodic task scheduler for Django 🎵

Beat Server Beatserver, a periodic task scheduler for django channels | beta software How to install Prerequirements: Follow django channels documenta

Raja Simon 130 Dec 17, 2022
Django database backed celery periodic task scheduler with support for task dependency graph

Djag Scheduler (Dj)ango Task D(AG) (Scheduler) Overview Djag scheduler associates scheduling information with celery tasks The task schedule is persis

Mohith Reddy 3 Nov 25, 2022
Dagon - An Asynchronous Task Graph Execution Engine

Dagon - An Asynchronous Task Graph Execution Engine Dagon is a job execution sys

8 Nov 17, 2022
SAQ (Simple Async Queue) is a simple and performant job queueing framework built on top of asyncio and redis

SAQ SAQ (Simple Async Queue) is a simple and performant job queueing framework built on top of asyncio and redis. It can be used for processing backgr

Toby Mao 117 Dec 30, 2022
Simple job queues for Python

Hypothesis Hypothesis is a family of testing libraries which let you write tests parametrized by a source of examples. A Hypothesis implementation the

RQ 8.7k Jan 07, 2023
Py_extract is a simple, light-weight python library to handle some extraction tasks using less lines of code

py_extract Py_extract is a simple, light-weight python library to handle some extraction tasks using less lines of code. Still in Development Stage! I

I'm Not A Bot #Left_TG 7 Nov 07, 2021
Accept queue automatically on League of Legends.

Accept queue automatically on League of Legends. I was inspired by the lucassmonn code accept-queue-lol-telegram, and I modify it according to my need

2 Sep 06, 2022