Aiorq is a distributed task queue with asyncio and redis

Overview

👽 Aiorq

Introduction

Aiorq is a distributed task queue with asyncio and redis, which rewrite from arq to make improvement and include web interface.

See documentation for more details.

Requirements

  • redis >= 5.0
  • aioredis>=1.1.0 <2.0.0

Install

pip install aiorq
pip install aioredis

Quick Start

Task Definition

None: await asyncio.sleep(3) print(f"Hi {name}") async def startup(ctx): print("starting... done") async def shutdown(ctx): print("ending... done") async def run_cron(ctx, time_='2021-11-16 10:26:05'): print(time_) class WorkerSettings: redis_settings = RedisSettings( host=os.getenv("REDIS_HOST", "127.0.0.1"), port=os.getenv("REDIS_PORT", 6379), database=os.getenv("REDIS_DATABASE", 0), password=os.getenv("REDIS_PASSWORD", None) ) functions = [say_hello, say_hi] on_startup = startup on_shutdown = shutdown cron_jobs = [ cron(coroutine=run_cron, name="x100", minute=40, second=50, keep_result_forever=True) ] # allow_abort_jobs = True # worker_name = "ohuo" # queue_name = "ohuo" ">
# tasks.py
# -*- coding: utf-8 -*-

import asyncio
import os

from aiorq.connections import RedisSettings
from aiorq.cron import cron


async def say_hello(ctx, name) -> None:
    await asyncio.sleep(5)
    print(f"Hello {name}")


async def say_hi(ctx, name) -> None:
    await asyncio.sleep(3)
    print(f"Hi {name}")


async def startup(ctx):
    print("starting... done")


async def shutdown(ctx):
    print("ending... done")


async def run_cron(ctx, time_='2021-11-16 10:26:05'):
    print(time_)


class WorkerSettings:
    redis_settings = RedisSettings(
        host=os.getenv("REDIS_HOST", "127.0.0.1"),
        port=os.getenv("REDIS_PORT", 6379),
        database=os.getenv("REDIS_DATABASE", 0),
        password=os.getenv("REDIS_PASSWORD", None)
    )

    functions = [say_hello, say_hi]

    on_startup = startup

    on_shutdown = shutdown

    cron_jobs = [
        cron(coroutine=run_cron, name="x100", minute=40, second=50, keep_result_forever=True)
    ]

    # allow_abort_jobs = True

    # worker_name = "ohuo"
    # queue_name = "ohuo"

Run aiorq worker

> aiorq tasks.WorkerSettings
15:08:50: Starting Queue: ohuo
15:08:50: Starting Worker: [email protected]
15:08:50: Starting Functions: say_hello, EnHeng
15:08:50: redis_version=5.0.10 mem_usage=731.12K clients_connected=2 db_keys=9
starting...

Integration in FastAPI

None: app.state.redis = await create_pool( RedisSettings( host=os.getenv("REDIS_HOST", "127.0.0.1"), port=os.getenv("REDIS_PORT", 6379), database=os.getenv("REDIS_DATABASE", 0), password=os.getenv("REDIS_PASSWORD", None) ) ) @app.get("/get_health_check") async def get_health_check(request: Request, worker_name): result = await request.app.state.redis._get_health_check(worker_name=worker_name) return {"result": json.loads(result)} @app.get("/enqueue_job_") async def enqueue_job_(request: Request): job = await request.app.state.redis.enqueue_job('qy_spider_', _queue_name="comment_queue", _job_try=4) job_ = await job.info() return {"job_": job_} @app.get("/index") async def index(request: Request): functions = await request.app.state.redis.all_tasks() workers = await request.app.state.redis.all_workers() results = await request.app.state.redis.all_job_results() functions_num = len(json.loads(functions)) workers_num = len(workers) results_num = len(results) results = {"functions_num": functions_num, "workers_num": workers_num, "results_num": results_num} return {"results": results} @app.get("/get_all_workers") async def get_all_workers(request: Request): results = await request.app.state.redis.all_workers() results = [json.loads(v) for v in results] return {"results": results} @app.get("/get_all_functions") async def get_all_functions(request: Request): results = await request.app.state.redis.all_tasks() return {"results": json.loads(results)} @app.get("/get_all_result") async def get_all_result(request: Request, worker=None, task=None, job_id=None): all_result_ = await request.app.state.redis.all_job_results() if worker: all_result_ = [result_ for result_ in all_result_ if result_.get("worker_name") == worker] if task: all_result_ = [result_ for result_ in all_result_ if result_.get("function") == task] if job_id: all_result_ = [result_ for result_ in all_result_ if result_.get("job_id") == job_id] return {"results_": all_result_} @app.get("/queued_jobs") async def queued_jobs(request: Request, queue_name="aiorq:queue"): queued_jobs_ = await request.app.state.redis.queued_jobs(queue_name=queue_name) queued_jobs__ = [] for queued_job_ in queued_jobs_: state = await Job(job_id=queued_job_.__dict__.get("job_id"), redis=request.app.state.redis, _queue_name=queue_name).status() queued_job_.__dict__.update({"state": state}) queued_jobs__.append(queued_job_) return {"queued_jobs": queued_jobs__} # job status @app.get("/job_status") async def job_status(request: Request, job_id="12673208ee3b417192b7cce06844adda", _queue_name="aiorq:queue"): job_status_ = await Job(job_id=job_id, redis=request.app.state.redis, _queue_name=_queue_name).info() return {"job_status_": job_status_} if __name__ == '__main__': import uvicorn uvicorn.run(app='main:app', host="0.0.0.0", port=9999, reload=True) ">
# -*- coding: utf-8 -*-
import json
import os

from fastapi import FastAPI
from starlette.requests import Request

from aiorq.connections import RedisSettings, create_pool
from aiorq.jobs import Job

app = FastAPI()


@app.on_event("startup")
async def startup() -> None:
    app.state.redis = await create_pool(
        RedisSettings(
            host=os.getenv("REDIS_HOST", "127.0.0.1"),
            port=os.getenv("REDIS_PORT", 6379),
            database=os.getenv("REDIS_DATABASE", 0),
            password=os.getenv("REDIS_PASSWORD", None)
        )
    )


@app.get("/get_health_check")
async def get_health_check(request: Request, worker_name):
    result = await request.app.state.redis._get_health_check(worker_name=worker_name)
    return {"result": json.loads(result)}


@app.get("/enqueue_job_")
async def enqueue_job_(request: Request):
    job = await request.app.state.redis.enqueue_job('qy_spider_', _queue_name="comment_queue", _job_try=4)
    job_ = await job.info()
    return {"job_": job_}


@app.get("/index")
async def index(request: Request):
    functions = await request.app.state.redis.all_tasks()
    workers = await request.app.state.redis.all_workers()
    results = await request.app.state.redis.all_job_results()
    functions_num = len(json.loads(functions))
    workers_num = len(workers)
    results_num = len(results)
    results = {"functions_num": functions_num, "workers_num": workers_num, "results_num": results_num}
    return {"results": results}


@app.get("/get_all_workers")
async def get_all_workers(request: Request):
    results = await request.app.state.redis.all_workers()
    results = [json.loads(v) for v in results]
    return {"results": results}


@app.get("/get_all_functions")
async def get_all_functions(request: Request):
    results = await request.app.state.redis.all_tasks()
    return {"results": json.loads(results)}


@app.get("/get_all_result")
async def get_all_result(request: Request, worker=None, task=None, job_id=None):
    all_result_ = await request.app.state.redis.all_job_results()
    if worker:
        all_result_ = [result_ for result_ in all_result_ if result_.get("worker_name") == worker]
    if task:
        all_result_ = [result_ for result_ in all_result_ if result_.get("function") == task]
    if job_id:
        all_result_ = [result_ for result_ in all_result_ if result_.get("job_id") == job_id]

    return {"results_": all_result_}

@app.get("/queued_jobs")
async def queued_jobs(request: Request, queue_name="aiorq:queue"):
    queued_jobs_ = await request.app.state.redis.queued_jobs(queue_name=queue_name)
    queued_jobs__ = []
    for queued_job_ in queued_jobs_:
        state = await Job(job_id=queued_job_.__dict__.get("job_id"), redis=request.app.state.redis,
                          _queue_name=queue_name).status()
        queued_job_.__dict__.update({"state": state})
        queued_jobs__.append(queued_job_)
    return {"queued_jobs": queued_jobs__}


# job status
@app.get("/job_status")
async def job_status(request: Request, job_id="12673208ee3b417192b7cce06844adda", _queue_name="aiorq:queue"):
    job_status_ = await Job(job_id=job_id, redis=request.app.state.redis, _queue_name=_queue_name).info()
    return {"job_status_": job_status_}


if __name__ == '__main__':
    import uvicorn
    uvicorn.run(app='main:app', host="0.0.0.0", port=9999, reload=True)

Thanks

License

MIT

The easiest way to automate your data

Hello, world! 👋 We've rebuilt data engineering for the data science era. Prefect is a new workflow management system, designed for modern infrastruct

Prefect 10.9k Jan 04, 2023
A simple scheduler tool that provides desktop notifications about classes and opens their meet links in the browser automatically at the start of the class.

This application provides desktop notifications about classes and opens their meet links in browser automatically at the start of the class.

Anshit 14 Jun 29, 2022
A calendaring app for Django. It is now stable, Please feel free to use it now. Active development has been taken over by bartekgorny.

Django-schedule A calendaring/scheduling application, featuring: one-time and recurring events calendar exceptions (occurrences changed or cancelled)

Tony Hauber 814 Dec 26, 2022
Python job scheduling for humans.

schedule Python job scheduling for humans. Run Python functions (or any other callable) periodically using a friendly syntax. A simple to use API for

Dan Bader 10.4k Jan 02, 2023
Remote task execution tool

Gunnery Gunnery is a multipurpose task execution tool for distributed systems with web-based interface. If your application is divided into multiple s

Gunnery 747 Nov 09, 2022
Crontab jobs management in Python

Plan Plan is a Python package for writing and deploying cron jobs. Plan will convert Python code to cron syntax. You can easily manage you

Shipeng Feng 1.2k Dec 28, 2022
Ffxiv-blended-job-icons - All action icons for each class/job are blended together to create new backgrounds for each job/class icon!

ffxiv-blended-job-icons All action icons for each class/job are blended together to create new backgrounds for each job/class icon! I used python to c

Jon Strutz 2 Jul 07, 2022
Clepsydra is a mini framework for task scheduling

Intro Clepsydra is a mini framework for task scheduling All parts are designed to be replaceable. Main ideas are: No pickle! Tasks are stored in reada

Andrey Tikhonov 15 Nov 04, 2022
A flexible python library for building your own cron-like system, with REST APIs and a Web UI.

Nextdoor Scheduler ndscheduler is a flexible python library for building your own cron-like system to schedule jobs, which is to run a tornado process

1k Dec 15, 2022
A Lightweight Cluster/Cloud VM Job Management Tool 🚀

Lightweight Cluster/Cloud VM Job Management 🚀 Are you looking for a tool to manage your training runs locally, on Slurm/Open Grid Engine clusters, SS

29 Dec 12, 2022
Python-Repeated-Timer is an open-source & highly performing timer using only standard-libraries.

Python Repeated Timer Python-Repeated-Timer is an open-source & highly performing timer using only standard-libraries.

TACKHYUN JUNG 3 Oct 09, 2022
Here is the live demonstration of endpoints and celery worker along with RabbitMQ

whelp-task Here is the live demonstration of endpoints and celery worker along with RabbitMQ Before running the application make sure that you have yo

Yalchin403 0 Nov 14, 2021
A Python concurrency scheduling library, compatible with asyncio and trio.

aiometer aiometer is a Python 3.6+ concurrency scheduling library compatible with asyncio and trio and inspired by Trimeter. It makes it easier to exe

Florimond Manca 182 Dec 26, 2022
Aiorq is a distributed task queue with asyncio and redis

Aiorq is a distributed task queue with asyncio and redis, which rewrite from arq to make improvement and include web interface.

PY-GZKY 5 Mar 18, 2022
A powerful workflow engine implemented in pure Python

Spiff Workflow Summary Spiff Workflow is a workflow engine implemented in pure Python. It is based on the excellent work of the Workflow Patterns init

Samuel 1.3k Jan 08, 2023
CoSA: Scheduling by Constrained Optimization for Spatial Accelerators

CoSA is a scheduler for spatial DNN accelerators that generate high-performance schedules in one shot using mixed integer programming

UC Berkeley Architecture Research 44 Dec 13, 2022
generate HPC scheduler systems jobs input scripts and submit these scripts to HPC systems and poke until they finish

DPDispatcher DPDispatcher is a python package used to generate HPC(High Performance Computing) scheduler systems (Slurm/PBS/LSF/dpcloudserver) jobs in

DeepModeling 23 Nov 30, 2022
Another Scheduler is a Kubernetes controller that automatically starts, stops, or restarts pods from a deployment at a specified time using a cron annotation.

Another Scheduler Another Scheduler is a Kubernetes controller that automatically starts, stops, or restarts pods from a deployment at a specified tim

Diego Najar 66 Nov 19, 2022
A task scheduler with task scheduling, timing and task completion time tracking functions

A task scheduler with task scheduling, timing and task completion time tracking functions. Could be helpful for time management in daily life.

ArthurLCW 0 Jan 15, 2022
dragonscales is a highly customizable asynchronous job-scheduler framework

dragonscales 🐉 dragonscales is a highly customizable asynchronous job-scheduler framework. This framework is used to scale the execution of multiple

Sorcero 2 May 16, 2022