Framework for creating efficient data processing pipelines

Related tags

Miscellaneousaqueduct
Overview

Aqueduct

Framework for creating efficient data processing pipelines.

Contact

Feel free to ask questions in telegram t.me/avito-ml

Key Features

  • Increase RPS (Requests Per Second) for your service
  • All optimisations in one library
  • Uses shared memory for transfer big data between processes

Get started

Simple example how to start with aqueduct using aiohttp. For better examples see examples

web.Application: app = web.Application() app['flow'] = Flow( FlowStep(SumHandler()), ) app.router.add_post('/sum', SumView) app['flow'].start() return app if __name__ == '__main__': web.run_app(prepare_app()) ">
from aiohttp import web
from aqueduct import Flow, FlowStep, BaseTaskHandler, BaseTask


class MyModel:
    """This is CPU bound model example."""
    
    def process(self, number):
        return sum(i * i for i in range(number))

class Task(BaseTask):
    """Container to send arguments to model."""
    def __init__(self, number):
        super().__init__()
        self.number = number
        self.sum = None  # result will be here
    
class SumHandler(BaseTaskHandler):
    """With aqueduct we need to wrap you're model."""
    def __init__(self):
        self._model = None

    def on_start(self):
        """Runs in child process, so memory no memory consumption in parent process."""
        self._model = MyModel()

    def handle(self, *tasks: Task):
        """List of tasks because it can be batching."""
        for task in tasks:
            task.sum = self._model.process(task.number)

            
class SumView(web.View):
    """Simple aiohttp-view handler"""

    async def post(self):
        number = await self.request.read()
        task = Task(int(number))
        await self.request.app['flow'].process(task)
        return web.json_response(data={'result': task.sum})


def prepare_app() -> web.Application:
    app = web.Application()

    app['flow'] = Flow(
        FlowStep(SumHandler()),
    )
    app.router.add_post('/sum', SumView)

    app['flow'].start()
    return app


if __name__ == '__main__':
    web.run_app(prepare_app())
    

Batching

Aqueduct supports the ability to process tasks with batches. Default batch size is one.

np.array: """Always says that there is a cat in the image. The image is represented by a one-dimensional array. The model spends less time for processing batch of images due to GPU optimizations. It's emulated with BATCH_REDUCTION_FACTOR coefficient. """ batch_size = images.shape[0] if batch_size == 1: time.sleep(self.IMAGE_PROCESS_TIME) else: time.sleep(self.IMAGE_PROCESS_TIME * batch_size * self.BATCH_REDUCTION_FACTOR) return np.ones(batch_size, dtype=bool) class CatDetectorHandler(BaseTaskHandler): def handle(self, *tasks: ArrayFieldTask): images = np.array([task.array for task in tasks]) predicts = CatDetector().predict(images) for task, predict in zip(tasks, predicts): task.result = predict def get_tasks_batch(batch_size: int = TASKS_BATCH_SIZE) -> List[BaseTask]: return [ArrayFieldTask(np.array([1, 2, 3])) for _ in range(batch_size)] async def process_tasks(flow: Flow, tasks: List[ArrayFieldTask]): await asyncio.gather(*(flow.process(task) for task in tasks)) tasks_batch = get_tasks_batch() flow_with_batch_handler = Flow(FlowStep(CatDetectorHandler(), batch_size=TASKS_BATCH_SIZE)) flow_with_batch_handler.start() # checks if no one result assert not any(task.result for task in tasks_batch) # task handling takes 0.16 secs that is less than sequential task processing with 0.22 secs await asyncio.wait_for( process_tasks(flow_with_batch_handler, tasks_batch), timeout=CatDetector.BATCH_PROCESS_TIME, ) # checks if all results were set assert all(task.result for task in tasks_batch) await flow_with_batch_handler.stop() # if we have batch size more than tasks number, we can limit batch accumulation time # with timeout parameter for processing time optimization tasks_batch = get_tasks_batch() flow_with_batch_handler = Flow( FlowStep(CatDetectorHandler(), batch_size=2*TASKS_BATCH_SIZE, batch_timeout=0.01) ) flow_with_batch_handler.start() await asyncio.wait_for( process_tasks(flow_with_batch_handler, tasks_batch), timeout=CatDetector.BATCH_PROCESS_TIME + 0.01, ) await flow_with_batch_handler.stop() ">
import asyncio
import time
from typing import List

import numpy as np

from aqueduct.flow import Flow, FlowStep
from aqueduct.handler import BaseTaskHandler
from aqueduct.task import BaseTask

# this constant needs just for example
TASKS_BATCH_SIZE = 20


class ArrayFieldTask(BaseTask):
    def __init__(self, array: np.array, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.array = array
        self.result = None


class CatDetector:
    """GPU model emulator that predicts the presence of the cat in the image."""
    IMAGE_PROCESS_TIME = 0.01
    BATCH_REDUCTION_FACTOR = 0.7
    OVERHEAD_TIME = 0.02
    BATCH_PROCESS_TIME = IMAGE_PROCESS_TIME * TASKS_BATCH_SIZE * BATCH_REDUCTION_FACTOR + OVERHEAD_TIME

    def predict(self, images: np.array) -> np.array:
        """Always says that there is a cat in the image.

        The image is represented by a one-dimensional array.
        The model spends less time for processing batch of images due to GPU optimizations. It's emulated
        with BATCH_REDUCTION_FACTOR coefficient.
        """
        batch_size = images.shape[0]
        if batch_size == 1:
            time.sleep(self.IMAGE_PROCESS_TIME)
        else:
            time.sleep(self.IMAGE_PROCESS_TIME * batch_size * self.BATCH_REDUCTION_FACTOR)
        return np.ones(batch_size, dtype=bool)


class CatDetectorHandler(BaseTaskHandler):
    def handle(self, *tasks: ArrayFieldTask):
        images = np.array([task.array for task in tasks])
        predicts = CatDetector().predict(images)
        for task, predict in zip(tasks, predicts):
            task.result = predict


def get_tasks_batch(batch_size: int = TASKS_BATCH_SIZE) -> List[BaseTask]:
    return [ArrayFieldTask(np.array([1, 2, 3])) for _ in range(batch_size)]


async def process_tasks(flow: Flow, tasks: List[ArrayFieldTask]):
    await asyncio.gather(*(flow.process(task) for task in tasks))


tasks_batch = get_tasks_batch()
flow_with_batch_handler = Flow(FlowStep(CatDetectorHandler(), batch_size=TASKS_BATCH_SIZE))
flow_with_batch_handler.start()

# checks if no one result
assert not any(task.result for task in tasks_batch)
# task handling takes 0.16 secs that is less than sequential task processing with 0.22 secs
await asyncio.wait_for(
    process_tasks(flow_with_batch_handler, tasks_batch), 
    timeout=CatDetector.BATCH_PROCESS_TIME,
)
# checks if all results were set
assert all(task.result for task in tasks_batch)

await flow_with_batch_handler.stop()

# if we have batch size more than tasks number, we can limit batch accumulation time 
# with timeout parameter for processing time optimization
tasks_batch = get_tasks_batch()
flow_with_batch_handler = Flow(
    FlowStep(CatDetectorHandler(), batch_size=2*TASKS_BATCH_SIZE, batch_timeout=0.01)
)
flow_with_batch_handler.start()

await asyncio.wait_for(
    process_tasks(flow_with_batch_handler, tasks_batch), 
    timeout=CatDetector.BATCH_PROCESS_TIME + 0.01,
)

await flow_with_batch_handler.stop()

Sentry

The implementation allows you to receive logger events from the workers and the main process. To integrate with Sentry, you need to write something like this:

import logging
import os

from raven import Client
from raven.handlers.logging import SentryHandler
from raven.transport.http import HTTPTransport

from aqueduct.logger import log


if os.getenv('SENTRY_ENABLED') is True:
    dsn = os.getenv('SENTRY_DSN')
    sentry_handler = SentryHandler(client=Client(dsn=dsn, transport=HTTPTransport), level=logging.ERROR)
    log.addHandler(sentry_handler)
Owner
avito.tech
avito.ru engineering team open source projects
avito.tech
Astroquery is an astropy affiliated package that contains a collection of tools to access online Astronomical data.

Astroquery is an astropy affiliated package that contains a collection of tools to access online Astronomical data.

The Astropy Project 631 Jan 05, 2023
Traffic flow test platform, especially for reinforcement learning

Traffic Flow Test Platform Traffic flow test platform, especially for reinforcement learning, named TFTP. A traffic signal control framework that can

4 Nov 07, 2022
Tiling manager which runs on top of EWMH window managers.

PyTyle is an extremely versatile and extensible tiling manager that is meant to be used on top of EWMH window managers. Its feature set was modeled af

55 Jul 29, 2021
Absolute solvation free energy calculations with OpenFF and OpenMM

ABsolute SOLVantion Free Energy Calculations The absolv framework aims to offer a simple API for computing the change in free energy when transferring

7 Dec 07, 2022
Subnet calculator script using python

subnetCalculator Subnet calculator script using python3 Interactive Version Define the subnet variable interactively Use: subnetDict = subnetCalculato

1 Feb 15, 2022
Speed up your typing by some exercises in the multi-platform(Windows/Ubuntu).

Introduction This project purpose is speed up your typing by some exercises in the multi-platform(Windows/Ubuntu). Build Environment Software Environm

lyfer233 1 Mar 24, 2022
A joke conlang with minimal semantics

SyntaxLang Reserved Defined Words Word Function fo Terminates a noun phrase or verb phrase tu Converts an adjective block or sentence to a noun to Ter

Leo Treloar 1 Dec 07, 2021
Automated rop chain generation

This is the accompanying code to the blog post talking about automated rop chain generation. Build the test file with: make Install the dependencies:

Christopher Roberts 14 Nov 22, 2022
Height 2 LDraw With python

Height2Ldraw About This project aims to be able to make a full lego 3D model using the ldraw file format (.ldr) from a height and color map, currently

1 Dec 22, 2021
Checks for Vaccine Availability at your district and notifies you using E-mail, subscribe to our website.

Vaccine Availability Notifier Project Description Checks for Vaccine Availability at your district and notifies you using E-mail every 10 mins. Kindly

Farhan Hai Khan 19 Jun 03, 2021
Python plugin/extra to load data files from an external source (such as AWS S3) to a local directory

Data Loader Plugin - Python Table of Content (ToC) Data Loader Plugin - Python Table of Content (ToC) Overview References Python module Python virtual

Cloud Helpers 2 Jan 10, 2022
Bitflip Fault Simulation Platform by Daniele Rizzieri (2021)

BFSP [v1.05] Bitflip Fault Simulation Platform by Daniele Rizzieri (2021) The platform injects a random bitflip in each of N copies of a binary file.

Daniele Rizzieri 2 Nov 05, 2022
Live tracking, flight database and competition framework

SkyLines SkyLines is a web platform where pilots can share their flights with others after, or even during flight via live tracking. SkyLines is a sor

SkyLines 367 Dec 27, 2022
Repository, with small useful and functional applications

Repositorio,com pequenos aplicativos uteis e funcionais A ideia e ir deselvolvendo pequenos aplicativos funcionais e adicionar a este repositorio List

GabrielDuke 6 Dec 06, 2021
High-level bindings to the Valhalla framework.

Valhalla for Python This spin-off project simply offers improved Python bindings to the fantastic Valhalla project. Installation pip install valhalla

GIS • OPS 20 Dec 13, 2022
An easy-to-learn, dynamic, interpreted, procedural programming language

Gen Programming Language WARNING!! THIS LANGUAGE IS IN DEVELOPMENT. ANYTHING CAN CHANGE AT ANY MOMENT. Gen is a dynamic, interpreted, procedural progr

Gen Programming Language 7 Oct 17, 2022
Weakly-Divisable - Takes an interger and seee if it is weakly divisible by seven

Weakly Divisble Project by Diana Arce-Hernandez, Ryan McAlpine, and Rommel Ravan

Diana Arce-Hernandez 1 Jan 12, 2022
A napari plugin to inspect data within a cisTEM project

napari-cistem A plugin to inspect data within a cisTEM project This napari plugin was generated with Cookiecutter using with @napari's cookiecutter-na

Johannes Elferich 1 Nov 07, 2021
A Guide for Feature Engineering and Feature Selection, with implementations and examples in Python.

Feature Engineering & Feature Selection A comprehensive guide [pdf] [markdown] for Feature Engineering and Feature Selection, with implementations and

Yimeng.Zhang 968 Dec 29, 2022
Defichain maxi - Scripts to optimize performance on defichain rewards

defichain_maxi This script is made to optimize your defichain vault rewards by m

kuegi 75 Dec 31, 2022