Framework for creating efficient data processing pipelines

Related tags

Miscellaneousaqueduct
Overview

Aqueduct

Framework for creating efficient data processing pipelines.

Contact

Feel free to ask questions in telegram t.me/avito-ml

Key Features

  • Increase RPS (Requests Per Second) for your service
  • All optimisations in one library
  • Uses shared memory for transfer big data between processes

Get started

Simple example how to start with aqueduct using aiohttp. For better examples see examples

web.Application: app = web.Application() app['flow'] = Flow( FlowStep(SumHandler()), ) app.router.add_post('/sum', SumView) app['flow'].start() return app if __name__ == '__main__': web.run_app(prepare_app()) ">
from aiohttp import web
from aqueduct import Flow, FlowStep, BaseTaskHandler, BaseTask


class MyModel:
    """This is CPU bound model example."""
    
    def process(self, number):
        return sum(i * i for i in range(number))

class Task(BaseTask):
    """Container to send arguments to model."""
    def __init__(self, number):
        super().__init__()
        self.number = number
        self.sum = None  # result will be here
    
class SumHandler(BaseTaskHandler):
    """With aqueduct we need to wrap you're model."""
    def __init__(self):
        self._model = None

    def on_start(self):
        """Runs in child process, so memory no memory consumption in parent process."""
        self._model = MyModel()

    def handle(self, *tasks: Task):
        """List of tasks because it can be batching."""
        for task in tasks:
            task.sum = self._model.process(task.number)

            
class SumView(web.View):
    """Simple aiohttp-view handler"""

    async def post(self):
        number = await self.request.read()
        task = Task(int(number))
        await self.request.app['flow'].process(task)
        return web.json_response(data={'result': task.sum})


def prepare_app() -> web.Application:
    app = web.Application()

    app['flow'] = Flow(
        FlowStep(SumHandler()),
    )
    app.router.add_post('/sum', SumView)

    app['flow'].start()
    return app


if __name__ == '__main__':
    web.run_app(prepare_app())
    

Batching

Aqueduct supports the ability to process tasks with batches. Default batch size is one.

np.array: """Always says that there is a cat in the image. The image is represented by a one-dimensional array. The model spends less time for processing batch of images due to GPU optimizations. It's emulated with BATCH_REDUCTION_FACTOR coefficient. """ batch_size = images.shape[0] if batch_size == 1: time.sleep(self.IMAGE_PROCESS_TIME) else: time.sleep(self.IMAGE_PROCESS_TIME * batch_size * self.BATCH_REDUCTION_FACTOR) return np.ones(batch_size, dtype=bool) class CatDetectorHandler(BaseTaskHandler): def handle(self, *tasks: ArrayFieldTask): images = np.array([task.array for task in tasks]) predicts = CatDetector().predict(images) for task, predict in zip(tasks, predicts): task.result = predict def get_tasks_batch(batch_size: int = TASKS_BATCH_SIZE) -> List[BaseTask]: return [ArrayFieldTask(np.array([1, 2, 3])) for _ in range(batch_size)] async def process_tasks(flow: Flow, tasks: List[ArrayFieldTask]): await asyncio.gather(*(flow.process(task) for task in tasks)) tasks_batch = get_tasks_batch() flow_with_batch_handler = Flow(FlowStep(CatDetectorHandler(), batch_size=TASKS_BATCH_SIZE)) flow_with_batch_handler.start() # checks if no one result assert not any(task.result for task in tasks_batch) # task handling takes 0.16 secs that is less than sequential task processing with 0.22 secs await asyncio.wait_for( process_tasks(flow_with_batch_handler, tasks_batch), timeout=CatDetector.BATCH_PROCESS_TIME, ) # checks if all results were set assert all(task.result for task in tasks_batch) await flow_with_batch_handler.stop() # if we have batch size more than tasks number, we can limit batch accumulation time # with timeout parameter for processing time optimization tasks_batch = get_tasks_batch() flow_with_batch_handler = Flow( FlowStep(CatDetectorHandler(), batch_size=2*TASKS_BATCH_SIZE, batch_timeout=0.01) ) flow_with_batch_handler.start() await asyncio.wait_for( process_tasks(flow_with_batch_handler, tasks_batch), timeout=CatDetector.BATCH_PROCESS_TIME + 0.01, ) await flow_with_batch_handler.stop() ">
import asyncio
import time
from typing import List

import numpy as np

from aqueduct.flow import Flow, FlowStep
from aqueduct.handler import BaseTaskHandler
from aqueduct.task import BaseTask

# this constant needs just for example
TASKS_BATCH_SIZE = 20


class ArrayFieldTask(BaseTask):
    def __init__(self, array: np.array, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.array = array
        self.result = None


class CatDetector:
    """GPU model emulator that predicts the presence of the cat in the image."""
    IMAGE_PROCESS_TIME = 0.01
    BATCH_REDUCTION_FACTOR = 0.7
    OVERHEAD_TIME = 0.02
    BATCH_PROCESS_TIME = IMAGE_PROCESS_TIME * TASKS_BATCH_SIZE * BATCH_REDUCTION_FACTOR + OVERHEAD_TIME

    def predict(self, images: np.array) -> np.array:
        """Always says that there is a cat in the image.

        The image is represented by a one-dimensional array.
        The model spends less time for processing batch of images due to GPU optimizations. It's emulated
        with BATCH_REDUCTION_FACTOR coefficient.
        """
        batch_size = images.shape[0]
        if batch_size == 1:
            time.sleep(self.IMAGE_PROCESS_TIME)
        else:
            time.sleep(self.IMAGE_PROCESS_TIME * batch_size * self.BATCH_REDUCTION_FACTOR)
        return np.ones(batch_size, dtype=bool)


class CatDetectorHandler(BaseTaskHandler):
    def handle(self, *tasks: ArrayFieldTask):
        images = np.array([task.array for task in tasks])
        predicts = CatDetector().predict(images)
        for task, predict in zip(tasks, predicts):
            task.result = predict


def get_tasks_batch(batch_size: int = TASKS_BATCH_SIZE) -> List[BaseTask]:
    return [ArrayFieldTask(np.array([1, 2, 3])) for _ in range(batch_size)]


async def process_tasks(flow: Flow, tasks: List[ArrayFieldTask]):
    await asyncio.gather(*(flow.process(task) for task in tasks))


tasks_batch = get_tasks_batch()
flow_with_batch_handler = Flow(FlowStep(CatDetectorHandler(), batch_size=TASKS_BATCH_SIZE))
flow_with_batch_handler.start()

# checks if no one result
assert not any(task.result for task in tasks_batch)
# task handling takes 0.16 secs that is less than sequential task processing with 0.22 secs
await asyncio.wait_for(
    process_tasks(flow_with_batch_handler, tasks_batch), 
    timeout=CatDetector.BATCH_PROCESS_TIME,
)
# checks if all results were set
assert all(task.result for task in tasks_batch)

await flow_with_batch_handler.stop()

# if we have batch size more than tasks number, we can limit batch accumulation time 
# with timeout parameter for processing time optimization
tasks_batch = get_tasks_batch()
flow_with_batch_handler = Flow(
    FlowStep(CatDetectorHandler(), batch_size=2*TASKS_BATCH_SIZE, batch_timeout=0.01)
)
flow_with_batch_handler.start()

await asyncio.wait_for(
    process_tasks(flow_with_batch_handler, tasks_batch), 
    timeout=CatDetector.BATCH_PROCESS_TIME + 0.01,
)

await flow_with_batch_handler.stop()

Sentry

The implementation allows you to receive logger events from the workers and the main process. To integrate with Sentry, you need to write something like this:

import logging
import os

from raven import Client
from raven.handlers.logging import SentryHandler
from raven.transport.http import HTTPTransport

from aqueduct.logger import log


if os.getenv('SENTRY_ENABLED') is True:
    dsn = os.getenv('SENTRY_DSN')
    sentry_handler = SentryHandler(client=Client(dsn=dsn, transport=HTTPTransport), level=logging.ERROR)
    log.addHandler(sentry_handler)
Owner
avito.tech
avito.ru engineering team open source projects
avito.tech
A python package for batch import of resume attachments to be parsed in HrFlow.

HrFlow Importer Description A python package for batch import of resume attachments to be parsed in HrFlow. hrflow-importer is an open-source project

HrFlow.ai (ex: Riminder.net) 3 Nov 15, 2022
Parametric Bottle in CADQuery

Parametric Bottle using CADQuery The proposed code makes it possible to generate different types and sizes of 3D bottles in order to train Pixel2mesh

Ayoub EL HOUDRI 1 May 22, 2022
This an Anki add on that automatically converts Notion notes into Anki flash cards. Currently in development!

NotionFlash This is an Anki add on in development that will allow automatically convert your Notion study notes into Anki flash cards. The Anki deck c

Neeraj Patel 10 Oct 07, 2022
Code for ML, domain generation, graph generation of ABC dataset

This is the repository for codes for ML, domain generation, graph generation of Asymmetric Buckling Columns (ABC) dataset in the paper "Learning Mechanically Driven Emergent Behavior with Message Pas

Peerasait Prachaseree (Jeffrey) 0 Jan 28, 2022
PyPI package for scaffolding out code for decision tree models that can learn to find relationships between the attributes of an object.

Decision Tree Writer This package allows you to train a binary classification decision tree on a list of labeled dictionaries or class instances, and

2 Apr 23, 2022
A python package that computes an optimal motion plan for approaching a red light

redlight_approach redlight_approach is a Python package that computes an optimal motion plan during traffic light approach. RLA_demo.mov Given the par

Jonathan Roy 4 Oct 27, 2022
p5 is a Python package based on the core ideas of Processing.

p5 p5 is a Python library that provides high level drawing functionality to help you quickly create simulations and interactive art using Python. It c

p5py 645 Jan 04, 2023
TrainingBike - Code, models and schematics I've used to interface my stationary training bike with PC.

TrainingBike Code, models and schematics I've used to interface my stationary training bike with PC. You can find more information about the project i

1 Jan 01, 2022
pyshell is a Linux subprocess module

pyshell A Linux subprocess module, An easier way to interact with the Linux shell pyshell should be cross platform but has only been tested with linux

4 Mar 02, 2022
This tool allows you to do goole dorking much easier

This tool allows you to do goole dorking much easier

Steven 8 Mar 06, 2022
Semester Project on Signal Processing @CS UCU 2021

Blur Detection with Haar Wavelet Transform Requirements Python3 opencv-python PyWavelets Install these using the following command: $ pip install -r r

ButynetsD 2 Oct 15, 2022
A programming language that for tech savvy graphic designers

Microsoft Hackathon - PhoTex Idea A programming language that allows tech savvy graphic designers develop scalable vector graphics using plain text co

Joe Furfaro 5 Nov 14, 2021
Scripts used in the RayStation medical radiation dosimetry treatment planning system

Med Phys Scripts These are scripts that I, the medical physics assistant at Cookeville Regional Medical Center, wrote for use in our radiation therapy

Kaley White 2 Oct 19, 2022
A flexible free and unlimited python tool to translate between different languages in a simple way using multiple translators.

deep-translator Translation for humans A flexible FREE and UNLIMITED tool to translate between different languages in a simple way using multiple tran

Nidhal Baccouri 806 Jan 04, 2023
Transpiles some Python into human-readable Golang.

pytago Transpiles some Python into human-readable Golang. Try out the web demo Installation and usage There are two "officially" supported ways to use

Michael Phelps 318 Jan 03, 2023
An example of Connecting a MySQL Database with Python Code

An example of Connecting And Query Data a MySQL Database with Python Code And How to install Table of contents General info Technologies Setup General

Mohammad Hosseinzadeh 1 Nov 23, 2021
Howell County, Missouri, COVID-19 data and (unofficial) estimates

COVID-19 in Howell County, Missouri This repository contains the daily data files used to generate my COVID-19 dashboard for Howell County, Missouri,

Jonathan Thornton 0 Jun 18, 2022
Experimental Brawl Stars v36.218 server emulator written in Python.

Brawl Stars v36 Experimental Brawl Stars v36.218 server emulator written in Python. Requirements: Python 3.7 or higher colorama Running the server In

8 Oct 31, 2021
A numbers check python package

A numbers check python package

Fayas Noushad 3 Nov 28, 2021
This wishes a mentioned users on their birthdays

BirthdayWisher Requirements: "mysqlserver", "email id and password", "Mysqlconnector" In-Built Modules: "smtplib", "datetime","imghdr" In Mysql: A tab

vellalaharshith 1 Sep 13, 2022