Framework for creating efficient data processing pipelines

Related tags

Miscellaneousaqueduct
Overview

Aqueduct

Framework for creating efficient data processing pipelines.

Contact

Feel free to ask questions in telegram t.me/avito-ml

Key Features

  • Increase RPS (Requests Per Second) for your service
  • All optimisations in one library
  • Uses shared memory for transfer big data between processes

Get started

Simple example how to start with aqueduct using aiohttp. For better examples see examples

web.Application: app = web.Application() app['flow'] = Flow( FlowStep(SumHandler()), ) app.router.add_post('/sum', SumView) app['flow'].start() return app if __name__ == '__main__': web.run_app(prepare_app()) ">
from aiohttp import web
from aqueduct import Flow, FlowStep, BaseTaskHandler, BaseTask


class MyModel:
    """This is CPU bound model example."""
    
    def process(self, number):
        return sum(i * i for i in range(number))

class Task(BaseTask):
    """Container to send arguments to model."""
    def __init__(self, number):
        super().__init__()
        self.number = number
        self.sum = None  # result will be here
    
class SumHandler(BaseTaskHandler):
    """With aqueduct we need to wrap you're model."""
    def __init__(self):
        self._model = None

    def on_start(self):
        """Runs in child process, so memory no memory consumption in parent process."""
        self._model = MyModel()

    def handle(self, *tasks: Task):
        """List of tasks because it can be batching."""
        for task in tasks:
            task.sum = self._model.process(task.number)

            
class SumView(web.View):
    """Simple aiohttp-view handler"""

    async def post(self):
        number = await self.request.read()
        task = Task(int(number))
        await self.request.app['flow'].process(task)
        return web.json_response(data={'result': task.sum})


def prepare_app() -> web.Application:
    app = web.Application()

    app['flow'] = Flow(
        FlowStep(SumHandler()),
    )
    app.router.add_post('/sum', SumView)

    app['flow'].start()
    return app


if __name__ == '__main__':
    web.run_app(prepare_app())
    

Batching

Aqueduct supports the ability to process tasks with batches. Default batch size is one.

np.array: """Always says that there is a cat in the image. The image is represented by a one-dimensional array. The model spends less time for processing batch of images due to GPU optimizations. It's emulated with BATCH_REDUCTION_FACTOR coefficient. """ batch_size = images.shape[0] if batch_size == 1: time.sleep(self.IMAGE_PROCESS_TIME) else: time.sleep(self.IMAGE_PROCESS_TIME * batch_size * self.BATCH_REDUCTION_FACTOR) return np.ones(batch_size, dtype=bool) class CatDetectorHandler(BaseTaskHandler): def handle(self, *tasks: ArrayFieldTask): images = np.array([task.array for task in tasks]) predicts = CatDetector().predict(images) for task, predict in zip(tasks, predicts): task.result = predict def get_tasks_batch(batch_size: int = TASKS_BATCH_SIZE) -> List[BaseTask]: return [ArrayFieldTask(np.array([1, 2, 3])) for _ in range(batch_size)] async def process_tasks(flow: Flow, tasks: List[ArrayFieldTask]): await asyncio.gather(*(flow.process(task) for task in tasks)) tasks_batch = get_tasks_batch() flow_with_batch_handler = Flow(FlowStep(CatDetectorHandler(), batch_size=TASKS_BATCH_SIZE)) flow_with_batch_handler.start() # checks if no one result assert not any(task.result for task in tasks_batch) # task handling takes 0.16 secs that is less than sequential task processing with 0.22 secs await asyncio.wait_for( process_tasks(flow_with_batch_handler, tasks_batch), timeout=CatDetector.BATCH_PROCESS_TIME, ) # checks if all results were set assert all(task.result for task in tasks_batch) await flow_with_batch_handler.stop() # if we have batch size more than tasks number, we can limit batch accumulation time # with timeout parameter for processing time optimization tasks_batch = get_tasks_batch() flow_with_batch_handler = Flow( FlowStep(CatDetectorHandler(), batch_size=2*TASKS_BATCH_SIZE, batch_timeout=0.01) ) flow_with_batch_handler.start() await asyncio.wait_for( process_tasks(flow_with_batch_handler, tasks_batch), timeout=CatDetector.BATCH_PROCESS_TIME + 0.01, ) await flow_with_batch_handler.stop() ">
import asyncio
import time
from typing import List

import numpy as np

from aqueduct.flow import Flow, FlowStep
from aqueduct.handler import BaseTaskHandler
from aqueduct.task import BaseTask

# this constant needs just for example
TASKS_BATCH_SIZE = 20


class ArrayFieldTask(BaseTask):
    def __init__(self, array: np.array, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.array = array
        self.result = None


class CatDetector:
    """GPU model emulator that predicts the presence of the cat in the image."""
    IMAGE_PROCESS_TIME = 0.01
    BATCH_REDUCTION_FACTOR = 0.7
    OVERHEAD_TIME = 0.02
    BATCH_PROCESS_TIME = IMAGE_PROCESS_TIME * TASKS_BATCH_SIZE * BATCH_REDUCTION_FACTOR + OVERHEAD_TIME

    def predict(self, images: np.array) -> np.array:
        """Always says that there is a cat in the image.

        The image is represented by a one-dimensional array.
        The model spends less time for processing batch of images due to GPU optimizations. It's emulated
        with BATCH_REDUCTION_FACTOR coefficient.
        """
        batch_size = images.shape[0]
        if batch_size == 1:
            time.sleep(self.IMAGE_PROCESS_TIME)
        else:
            time.sleep(self.IMAGE_PROCESS_TIME * batch_size * self.BATCH_REDUCTION_FACTOR)
        return np.ones(batch_size, dtype=bool)


class CatDetectorHandler(BaseTaskHandler):
    def handle(self, *tasks: ArrayFieldTask):
        images = np.array([task.array for task in tasks])
        predicts = CatDetector().predict(images)
        for task, predict in zip(tasks, predicts):
            task.result = predict


def get_tasks_batch(batch_size: int = TASKS_BATCH_SIZE) -> List[BaseTask]:
    return [ArrayFieldTask(np.array([1, 2, 3])) for _ in range(batch_size)]


async def process_tasks(flow: Flow, tasks: List[ArrayFieldTask]):
    await asyncio.gather(*(flow.process(task) for task in tasks))


tasks_batch = get_tasks_batch()
flow_with_batch_handler = Flow(FlowStep(CatDetectorHandler(), batch_size=TASKS_BATCH_SIZE))
flow_with_batch_handler.start()

# checks if no one result
assert not any(task.result for task in tasks_batch)
# task handling takes 0.16 secs that is less than sequential task processing with 0.22 secs
await asyncio.wait_for(
    process_tasks(flow_with_batch_handler, tasks_batch), 
    timeout=CatDetector.BATCH_PROCESS_TIME,
)
# checks if all results were set
assert all(task.result for task in tasks_batch)

await flow_with_batch_handler.stop()

# if we have batch size more than tasks number, we can limit batch accumulation time 
# with timeout parameter for processing time optimization
tasks_batch = get_tasks_batch()
flow_with_batch_handler = Flow(
    FlowStep(CatDetectorHandler(), batch_size=2*TASKS_BATCH_SIZE, batch_timeout=0.01)
)
flow_with_batch_handler.start()

await asyncio.wait_for(
    process_tasks(flow_with_batch_handler, tasks_batch), 
    timeout=CatDetector.BATCH_PROCESS_TIME + 0.01,
)

await flow_with_batch_handler.stop()

Sentry

The implementation allows you to receive logger events from the workers and the main process. To integrate with Sentry, you need to write something like this:

import logging
import os

from raven import Client
from raven.handlers.logging import SentryHandler
from raven.transport.http import HTTPTransport

from aqueduct.logger import log


if os.getenv('SENTRY_ENABLED') is True:
    dsn = os.getenv('SENTRY_DSN')
    sentry_handler = SentryHandler(client=Client(dsn=dsn, transport=HTTPTransport), level=logging.ERROR)
    log.addHandler(sentry_handler)
Owner
avito.tech
avito.ru engineering team open source projects
avito.tech
A programming language that for tech savvy graphic designers

Microsoft Hackathon - PhoTex Idea A programming language that allows tech savvy graphic designers develop scalable vector graphics using plain text co

Joe Furfaro 5 Nov 14, 2021
Script that creates graphical representations of Julia an Mandelbrot sets.

Julia and Mandelbrot Picture Maker This simple functions create simple plots of the Julia and Mandelbrot sets. The Julia set require the important par

Juan Riera Gomez 1 Jan 10, 2022
Listen Surah, prepare for next and Endless life...

Al-Quran In this repository, I have linked up all Surah with Arabic-Bangla Audio From Youtube. So, you just need to choose and listen. and the ( surah

SpiderX 1 Sep 30, 2022
chiarose(XCR) based on chia(XCH) source code fork, open source public chain

chia-rosechain 一个无耻的小活动 | A shameless little event 如果您喜欢这个项目,请点击star 将赠送您520朵玫瑰,可以去 facebook 留下您的(xcr)地址,和github用户名。 If you like this project, please

ddou123 376 Dec 14, 2022
Minecraft Multi-Server Pinger Discord Embed

Minecraft Network Pinger Minecraft Multi-Server Pinger Discord Embed What does this bot do? It sends an embed and uses mcsrvstat API and checks if the

YungHub 2 Jan 05, 2022
Python implementation for Active Directory certificate abuse

Certipy is a Python tool to enumerate and abuse misconfigurations in Active Directory Certificate Services (AD CS). Based on the C# variant Ce

Oliver Lyak 1.3k Jan 09, 2023
Tools for downloading and processing numerical weather predictions

NWP Tools for downloading and processing numerical weather predictions At the moment, this code is focused on downloading historical UKV NWPs produced

Open Climate Fix 6 Nov 24, 2022
Probably the best way to simulate block scopes in Python

This is a package, as it says on the tin, to emulate block scoping in Python, the lack of which being a clever design choice yet sometimes a trouble.

88 Oct 26, 2022
SuperMario - Python programming class ending assignment SuperMario, using pygame

SuperMario - Python programming class ending assignment SuperMario, using pygame

mars 2 Jan 04, 2022
Rotating cube with hand

I am still working on this project :)) To-Do(Present): = It needs an algorithm to fine tune my hand's coordinates for rotation of our cube (initial o

Jay Desale 2 Dec 26, 2021
Rename and categorize your DMOJ solutions

DMOJ Downloader What is this for? DMOJ lets you download the code for all your solutions, however the files are just named as numbers

Evan Wild 1 Dec 04, 2022
PressurePlate is a multi-agent environment that requires agents to cooperate during the traversal of a gridworld.

PressurePlate is a multi-agent environment that requires agents to cooperate during the traversal of a gridworld. The grid is partitioned into several rooms, and each room contains a plate and a clos

Autonomous Agents Research Group (University of Edinburgh) 6 Dec 03, 2022
A python program, imitating functionalities of a banking system

A python program, imitating functionalities of a banking system, in order for users to perform certain operations in a bank.

Moyosore Weke 1 Nov 26, 2021
WorldsCollide - Final Fantasy VI Randomizer

FFVI Worlds Collide Worlds Collide is an open worlds randomizer for Final Fantas

8 Jun 13, 2022
Runs macOS on linux with qemu.

mac-on-linux-with-qemu Runs macOS on linux with qemu. Pre-requisites qemu-system-x86_64 dmg2img pulseaudio python[click] Usage After cloning the repos

Arindam Das 177 Dec 26, 2022
Small tool to use hero .json files created with Optolith for The Dark Eye/ Das Schwarze Auge 5 to perform talent probes.

DSA5-ProbeMaker A little tool for The Dark Eye 5th Edition (Das Schwarze Auge 5) to load .json from Optolith character generation and easily perform t

2 Jan 06, 2022
Projeto-menu - This project is designed to learn more about control mechanisms in Python programming

Projeto-menu - This project is designed to learn more about control mechanisms in Python programming

Henrik Ricarte 2 Mar 01, 2022
Automatically give thanks to Pypi packages you use in your project!

Automatically give thanks to Pypi packages you use in your project!

Ward 25 Dec 20, 2021
GWCelery is a simple and reliable package for annotating and orchestrating LIGO/Virgo alerts

GWCelery is a simple and reliable package for annotating and orchestrating LIGO/Virgo alerts, built from widely used open source components.

Min-A Cho Zeno 1 Nov 02, 2021
🐍 A Python lib for (de)serializing Python objects to/from JSON

Turn Python objects into dicts or (json)strings and back No changes required to your objects Easily customizable and extendable Works with dataclasses

Ramon Hagenaars 253 Dec 14, 2022