Pyramid configuration with celery integration. Allows you to use pyramid .ini files to configure celery and have your pyramid configuration inside celery tasks.

Overview

Getting Started

https://travis-ci.org/sontek/pyramid_celery.png?branch=master https://coveralls.io/repos/sontek/pyramid_celery/badge.png?branch=master

Include pyramid_celery either by setting your includes in your .ini, or by calling config.include('pyramid_celery'):

pyramid.includes = pyramid_celery

Then you just need to tell pyramid_celery what ini file your [celery] section is in:

config.configure_celery('development.ini')

Then you are free to use celery, for example class based:

from pyramid_celery import celery_app as app

class AddTask(app.Task):
    def run(self, x, y):
        print x+y

or decorator based:

from pyramid_celery import celery_app as app

@app.task
def add(x, y):
    print x+y

To get pyramid settings you may access them in app.conf['PYRAMID_REGISTRY'].

Configuration

By default pyramid_celery assumes you want to configure celery via an ini settings. You can do this by calling config.configure_celery('development.ini') but if you are already in the main of your application and want to use the ini used to configure the app you can do the following:

config.configure_celery(global_config['__file__'])

If you want to use the standard celeryconfig python file you can set the use_celeryconfig = True like this:

[celery]
use_celeryconfig = True

You can get more information for celeryconfig.py here:

http://celery.readthedocs.io/en/latest/userguide/configuration.html

An example ini configuration looks like this:

[celery]
broker_url = redis://localhost:1337/0
imports = app1.tasks
          app2.tasks

[celery:broker_transport_options]
visibility_timeout = 18000
max_retries = 5

[celerybeat:task1]
task = app1.tasks.Task1
type = crontab
schedule = {"minute": 0}

You'll notice the configuration options that are dictionaries or have multiple values will be split into their own sections.

Scheduled/Periodic Tasks

To use celerybeat (periodic tasks) you need to declare 1 celerybeat config section per task. The options are:

  • task - The python task you need executed.
  • type - The type of scheduling your configuration uses, options are crontab, timedelta, and integer.
  • schedule - The actual schedule for your type of configuration.
  • args - Additional positional arguments.
  • kwargs - Additional keyword arguments.

Example configuration for this:

[celerybeat:task1]
task = app1.tasks.Task1
type = crontab
schedule = {"minute": 0}

[celerybeat:task2]
task = app1.tasks.Task2
type = timedelta
schedule = {"seconds": 30}
args = [16, 16]

[celerybeat:task3]
task = app2.tasks.Task1
type = crontab
schedule = {"hour": 0, "minute": 0}
kwargs = {"boom": "shaka"}

[celerybeat:task4]
task = myapp.tasks.Task4
type = integer
schedule = 30

A gotcha you want to watchout for is that the date/time in scheduled tasks is UTC by default. If you want to schedule for an exact date/time for your local timezone you need to set timezone. Documentation for that can be found here:

http://celery.readthedocs.org/en/latest/userguide/periodic-tasks.html#time-zones

If you need to find out what timezones are available you can do the following:

from pprint import pprint
from pytz import all_timezones
pprint(all_timezones)

Worker Execution

The celerybeat worker will read your configuration and schedule tasks in the queue to be executed at the time defined. This means if you are using celerybeat you will end up running 2 workers:

$ celery -A pyramid_celery.celery_app worker --ini development.ini
$ celery -A pyramid_celery.celery_app beat --ini development.ini

The first command is the standard worker command that will read messages off of the queue and run the task. The second command will read the celerybeat configuration and periodically schedule tasks on the queue.

Routing

If you would like to route a task to a specific queue you can define a route per task by declaring their queue and/or routing_key in a celeryroute section.

An example configuration for this:

[celeryroute:otherapp.tasks.Task3]
queue = slow_tasks
routing_key = turtle

[celeryroute:myapp.tasks.Task1]
queue = fast_tasks

Running the worker

To run the worker we just use the standard celery command with an additional argument:

celery worker -A pyramid_celery.celery_app --ini development.ini

If you've defined variables in your .ini like %(database_username)s you can use the --ini-var argument, which is a comma separated list of key value pairs:

celery worker -A pyramid_celery.celery_app --ini development.ini --ini-var=database_username=sontek,database_password=OhYeah!

The values in ini-var cannot have spaces in them, this will break celery's parser.

The reason it is a csv instead of using --ini-var multiple times is because of a bug in celery itself. When they fix the bug we will re-work the API. Ticket is here:

https://github.com/celery/celery/pull/2435

If you use celerybeat scheduler you need to run with the --beat flag to run beat and the worker at the same time.

celery worker --beat -A pyramid_celery.celery_app --ini development.ini

Or you can launch it separately like this:

celery beat -A pyramid_celery.celery_app --ini development.ini

Logging

If you use the .ini configuration (i.e don't use celeryconfig.py) then the logging configuration will be loaded from the .ini and will not use the default celery loggers.

You most likely want to add a logging section to your ini for celery as well:

[logger_celery]
level = INFO
handlers =
qualname = celery

and then update your [loggers] section to include it.

If you want use the default celery loggers then you can set CELERYD_HIJACK_ROOT_LOGGER=True in the [celery] section of your .ini.

Celery worker processes do not propagate exceptions inside tasks, but swallow them silently by default. This is related to the behavior of reading asynchronous task results back. To see if your tasks fail you might need to configure celery.worker.job logger to propagate exceptions:

# Make sure Celery worker doesn't silently swallow exceptions
# See http://stackoverflow.com/a/20719461/315168
# https://github.com/celery/celery/issues/2437
[logger_celery_worker_job]
level = ERROR
handlers =
qualname = celery.worker.job
propagate = 1

If you want use the default celery loggers then you can set CELERYD_HIJACK_ROOT_LOGGER=True in the [celery] section of your .ini

Demo

To see it all in action check out examples/long_running_with_tm, run redis-server and then do:

$ python setup.py develop
$ populate_long_running_with_tm development.ini
$ pserve ./development.ini
$ celery worker -A pyramid_celery.celery_app --ini development.ini
Owner
John Anderson
John Anderson
SAQ (Simple Async Queue) is a simple and performant job queueing framework built on top of asyncio and redis

SAQ SAQ (Simple Async Queue) is a simple and performant job queueing framework built on top of asyncio and redis. It can be used for processing backgr

Toby Mao 117 Dec 30, 2022
Mr. Queue - A distributed worker task queue in Python using Redis & gevent

MRQ MRQ is a distributed task queue for python built on top of mongo, redis and gevent. Full documentation is available on readthedocs Why? MRQ is an

Pricing Assistant 871 Dec 25, 2022
PostgreSQL-based Task Queue for Python

Procrastinate: PostgreSQL-based Task Queue for Python Procrastinate is an open-source Python 3.7+ distributed task processing library, leveraging Post

Procrastinate 486 Jan 08, 2023
Sync Laravel queue with Python. Provides an interface for communication between Laravel and Python.

Python Laravel Queue Queue sync between Python and Laravel using Redis driver. You can process jobs dispatched from Laravel in Python. NOTE: This pack

Sinan Bekar 3 Oct 01, 2022
Beatserver, a periodic task scheduler for Django 🎵

Beat Server Beatserver, a periodic task scheduler for django channels | beta software How to install Prerequirements: Follow django channels documenta

Raja Simon 130 Dec 17, 2022
FastAPI with Celery

Minimal example utilizing fastapi and celery with RabbitMQ for task queue, Redis for celery backend and flower for monitoring the celery tasks.

Grega Vrbančič 371 Jan 01, 2023
Add you own metrics to your celery backend

Add you own metrics to your celery backend

Gandi 1 Dec 16, 2022
Py_extract is a simple, light-weight python library to handle some extraction tasks using less lines of code

py_extract Py_extract is a simple, light-weight python library to handle some extraction tasks using less lines of code. Still in Development Stage! I

I'm Not A Bot #Left_TG 7 Nov 07, 2021
Distributed Task Queue (development branch)

Version: 5.1.0b1 (singularity) Web: https://docs.celeryproject.org/en/stable/index.html Download: https://pypi.org/project/celery/ Source: https://git

Celery 20.7k Jan 01, 2023
Asynchronous serverless task queue with timed leasing of tasks

Asynchronous serverless task queue with timed leasing of tasks. Threaded implementations for SQS and local filesystem.

24 Dec 14, 2022
Clearly see and debug your celery cluster in real time!

Clearly see and debug your celery cluster in real time! Do you use celery, and monitor your tasks with flower? You'll probably like Clearly! 👍 Clearl

Rogério Sampaio de Almeida 364 Jan 02, 2023
Accept queue automatically on League of Legends.

Accept queue automatically on League of Legends. I was inspired by the lucassmonn code accept-queue-lol-telegram, and I modify it according to my need

2 Sep 06, 2022
Django database backed celery periodic task scheduler with support for task dependency graph

Djag Scheduler (Dj)ango Task D(AG) (Scheduler) Overview Djag scheduler associates scheduling information with celery tasks The task schedule is persis

Mohith Reddy 3 Nov 25, 2022
Asynchronous tasks in Python with Celery + RabbitMQ + Redis

python-asynchronous-tasks Setup & Installation Create a virtual environment and install the dependencies: $ python -m venv venv $ source env/bin/activ

Valon Januzaj 40 Dec 03, 2022
A fully-featured e-commerce application powered by Django

kobbyshop - Django Ecommerce App A fully featured e-commerce application powered by Django. Sections Project Description Features Technology Setup Scr

Kwabena Yeboah 2 Feb 15, 2022
OpenQueue is a experimental CS: GO match system written in asyncio python.

What is OpenQueue OpenQueue is a experimental CS: GO match system written in asyncio python. Please star! This project was a lot of work & still has a

OpenQueue 10 May 13, 2022
Dagon - An Asynchronous Task Graph Execution Engine

Dagon - An Asynchronous Task Graph Execution Engine Dagon is a job execution sys

8 Nov 17, 2022
Flower is a web based tool for monitoring and administrating Celery clusters.

Real-time monitor and web admin for Celery distributed task queue

Mher Movsisyan 5.5k Jan 02, 2023
Redis-backed message queue implementation that can hook into a discord bot written with hikari-lightbulb.

Redis-backed FIFO message queue implementation that can hook into a discord bot written with hikari-lightbulb. This is eventually intended to be the backend communication between a bot and a web dash

thomm.o 7 Dec 05, 2022
Pyramid configuration with celery integration. Allows you to use pyramid .ini files to configure celery and have your pyramid configuration inside celery tasks.

Getting Started Include pyramid_celery either by setting your includes in your .ini, or by calling config.include('pyramid_celery'): pyramid.includes

John Anderson 102 Dec 02, 2022