Framework for creating efficient data processing pipelines

Related tags

Miscellaneousaqueduct
Overview

Aqueduct

Framework for creating efficient data processing pipelines.

Contact

Feel free to ask questions in telegram t.me/avito-ml

Key Features

  • Increase RPS (Requests Per Second) for your service
  • All optimisations in one library
  • Uses shared memory for transfer big data between processes

Get started

Simple example how to start with aqueduct using aiohttp. For better examples see examples

web.Application: app = web.Application() app['flow'] = Flow( FlowStep(SumHandler()), ) app.router.add_post('/sum', SumView) app['flow'].start() return app if __name__ == '__main__': web.run_app(prepare_app()) ">
from aiohttp import web
from aqueduct import Flow, FlowStep, BaseTaskHandler, BaseTask


class MyModel:
    """This is CPU bound model example."""
    
    def process(self, number):
        return sum(i * i for i in range(number))

class Task(BaseTask):
    """Container to send arguments to model."""
    def __init__(self, number):
        super().__init__()
        self.number = number
        self.sum = None  # result will be here
    
class SumHandler(BaseTaskHandler):
    """With aqueduct we need to wrap you're model."""
    def __init__(self):
        self._model = None

    def on_start(self):
        """Runs in child process, so memory no memory consumption in parent process."""
        self._model = MyModel()

    def handle(self, *tasks: Task):
        """List of tasks because it can be batching."""
        for task in tasks:
            task.sum = self._model.process(task.number)

            
class SumView(web.View):
    """Simple aiohttp-view handler"""

    async def post(self):
        number = await self.request.read()
        task = Task(int(number))
        await self.request.app['flow'].process(task)
        return web.json_response(data={'result': task.sum})


def prepare_app() -> web.Application:
    app = web.Application()

    app['flow'] = Flow(
        FlowStep(SumHandler()),
    )
    app.router.add_post('/sum', SumView)

    app['flow'].start()
    return app


if __name__ == '__main__':
    web.run_app(prepare_app())
    

Batching

Aqueduct supports the ability to process tasks with batches. Default batch size is one.

np.array: """Always says that there is a cat in the image. The image is represented by a one-dimensional array. The model spends less time for processing batch of images due to GPU optimizations. It's emulated with BATCH_REDUCTION_FACTOR coefficient. """ batch_size = images.shape[0] if batch_size == 1: time.sleep(self.IMAGE_PROCESS_TIME) else: time.sleep(self.IMAGE_PROCESS_TIME * batch_size * self.BATCH_REDUCTION_FACTOR) return np.ones(batch_size, dtype=bool) class CatDetectorHandler(BaseTaskHandler): def handle(self, *tasks: ArrayFieldTask): images = np.array([task.array for task in tasks]) predicts = CatDetector().predict(images) for task, predict in zip(tasks, predicts): task.result = predict def get_tasks_batch(batch_size: int = TASKS_BATCH_SIZE) -> List[BaseTask]: return [ArrayFieldTask(np.array([1, 2, 3])) for _ in range(batch_size)] async def process_tasks(flow: Flow, tasks: List[ArrayFieldTask]): await asyncio.gather(*(flow.process(task) for task in tasks)) tasks_batch = get_tasks_batch() flow_with_batch_handler = Flow(FlowStep(CatDetectorHandler(), batch_size=TASKS_BATCH_SIZE)) flow_with_batch_handler.start() # checks if no one result assert not any(task.result for task in tasks_batch) # task handling takes 0.16 secs that is less than sequential task processing with 0.22 secs await asyncio.wait_for( process_tasks(flow_with_batch_handler, tasks_batch), timeout=CatDetector.BATCH_PROCESS_TIME, ) # checks if all results were set assert all(task.result for task in tasks_batch) await flow_with_batch_handler.stop() # if we have batch size more than tasks number, we can limit batch accumulation time # with timeout parameter for processing time optimization tasks_batch = get_tasks_batch() flow_with_batch_handler = Flow( FlowStep(CatDetectorHandler(), batch_size=2*TASKS_BATCH_SIZE, batch_timeout=0.01) ) flow_with_batch_handler.start() await asyncio.wait_for( process_tasks(flow_with_batch_handler, tasks_batch), timeout=CatDetector.BATCH_PROCESS_TIME + 0.01, ) await flow_with_batch_handler.stop() ">
import asyncio
import time
from typing import List

import numpy as np

from aqueduct.flow import Flow, FlowStep
from aqueduct.handler import BaseTaskHandler
from aqueduct.task import BaseTask

# this constant needs just for example
TASKS_BATCH_SIZE = 20


class ArrayFieldTask(BaseTask):
    def __init__(self, array: np.array, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.array = array
        self.result = None


class CatDetector:
    """GPU model emulator that predicts the presence of the cat in the image."""
    IMAGE_PROCESS_TIME = 0.01
    BATCH_REDUCTION_FACTOR = 0.7
    OVERHEAD_TIME = 0.02
    BATCH_PROCESS_TIME = IMAGE_PROCESS_TIME * TASKS_BATCH_SIZE * BATCH_REDUCTION_FACTOR + OVERHEAD_TIME

    def predict(self, images: np.array) -> np.array:
        """Always says that there is a cat in the image.

        The image is represented by a one-dimensional array.
        The model spends less time for processing batch of images due to GPU optimizations. It's emulated
        with BATCH_REDUCTION_FACTOR coefficient.
        """
        batch_size = images.shape[0]
        if batch_size == 1:
            time.sleep(self.IMAGE_PROCESS_TIME)
        else:
            time.sleep(self.IMAGE_PROCESS_TIME * batch_size * self.BATCH_REDUCTION_FACTOR)
        return np.ones(batch_size, dtype=bool)


class CatDetectorHandler(BaseTaskHandler):
    def handle(self, *tasks: ArrayFieldTask):
        images = np.array([task.array for task in tasks])
        predicts = CatDetector().predict(images)
        for task, predict in zip(tasks, predicts):
            task.result = predict


def get_tasks_batch(batch_size: int = TASKS_BATCH_SIZE) -> List[BaseTask]:
    return [ArrayFieldTask(np.array([1, 2, 3])) for _ in range(batch_size)]


async def process_tasks(flow: Flow, tasks: List[ArrayFieldTask]):
    await asyncio.gather(*(flow.process(task) for task in tasks))


tasks_batch = get_tasks_batch()
flow_with_batch_handler = Flow(FlowStep(CatDetectorHandler(), batch_size=TASKS_BATCH_SIZE))
flow_with_batch_handler.start()

# checks if no one result
assert not any(task.result for task in tasks_batch)
# task handling takes 0.16 secs that is less than sequential task processing with 0.22 secs
await asyncio.wait_for(
    process_tasks(flow_with_batch_handler, tasks_batch), 
    timeout=CatDetector.BATCH_PROCESS_TIME,
)
# checks if all results were set
assert all(task.result for task in tasks_batch)

await flow_with_batch_handler.stop()

# if we have batch size more than tasks number, we can limit batch accumulation time 
# with timeout parameter for processing time optimization
tasks_batch = get_tasks_batch()
flow_with_batch_handler = Flow(
    FlowStep(CatDetectorHandler(), batch_size=2*TASKS_BATCH_SIZE, batch_timeout=0.01)
)
flow_with_batch_handler.start()

await asyncio.wait_for(
    process_tasks(flow_with_batch_handler, tasks_batch), 
    timeout=CatDetector.BATCH_PROCESS_TIME + 0.01,
)

await flow_with_batch_handler.stop()

Sentry

The implementation allows you to receive logger events from the workers and the main process. To integrate with Sentry, you need to write something like this:

import logging
import os

from raven import Client
from raven.handlers.logging import SentryHandler
from raven.transport.http import HTTPTransport

from aqueduct.logger import log


if os.getenv('SENTRY_ENABLED') is True:
    dsn = os.getenv('SENTRY_DSN')
    sentry_handler = SentryHandler(client=Client(dsn=dsn, transport=HTTPTransport), level=logging.ERROR)
    log.addHandler(sentry_handler)
Owner
avito.tech
avito.ru engineering team open source projects
avito.tech
The repository is about 100+ python programming exercise problem discussed, explained, and solved in different ways

Break The Ice With Python A journey of 100+ simple yet interesting problems which are explained, solved, discussed in different pythonic ways Introduc

Abdullah Al Masud Tushar 2.2k Jan 04, 2023
Simple Denial of Service Program yang di bikin menggunakan bahasa pemograman Python,

Peringatan Tujuan kami share code Indo-DoS hanya untuk bertujuan edukasi / pembelajaran! Dilarang memperjual belikan source ini / memperjual-belikan s

SonLyte 8 Nov 07, 2021
Яндекс тренировки по алгоритмам. Июнь 2021

Young&&Yandex Тренировки по алгоритмам Если вы хотите попасть на летнюю стажировку в Яндекс, но пока не уверены в своих силах, приходите на наши трени

Podlevskiy Viktor 6 Sep 03, 2021
Trusted sessions for falcon using itsdangerous.

Falcon signed sessions This project allows you to easily add trusted cookies to falcon, it works by storing a signed cookie in the client's browser us

Ward 1 Feb 08, 2022
My solutions for Advent of Code 2021 🌟🎄

🌟 Advent of Code 2021 🎄 My solutions for Advent of Code 2021. About · What is Advent of Code? · Contents · Usage · Table of puzzles (TODO: add final

Amanda P. Pinha 2 Dec 05, 2022
List Less Than Ten with python

List Less Than Ten with python

PyLaboratory 0 Feb 07, 2022
An upgraded version of extractJS

extractJS_2.0 An enhanced version of extractJS with even more functionality Features Discover JavaScript files directly from the webpage Customizable

Ali 4 Dec 21, 2022
Hands-on machine learning workshop

emb-ntua-workshop This workshop discusses introductory concepts of machine learning and data mining following a hands-on approach using popular tools

ISSEL Soft Eng Team 12 Oct 30, 2022
Hartree-Fock Workshop for the Han-sur-Lesse Winterschool of 2021

Hartree-Fock course for the Han-sur-Lesse Winterschool of 2021 Requirements For going through these exercises, please install the Anaconda suite. Next

Ivo Filot 2 Nov 16, 2022
A companion web application to connect stash to deovr

stash-vr-companion This is a companion web application to connect stash to deovr. Stash is a self hosted web application to manage your porn collectio

19 Sep 29, 2022
A module comment generator for python

Module Comment Generator The comment style is as a tribute to the comment from the RA . The comment generator can parse the ast tree from the python s

飘尘 1 Oct 21, 2021
Movie recommend community

README 0. 초록 1) 목적 사용자의 Needs를 기반으로 영화를 추천해주는 커뮤니티 서비스 구현 2) p!ck 서비스란? "pick your taste!" 취향대로 영화 플레이리스트(이하 서비스 내에서의 명칭인 '바스켓'이라 함)를 만들고, 비슷한 취향을 가진

2 Dec 08, 2021
Easily Generate Revolut Business Cards

RevBusinessCardGen Easily Generate Revolut Business Cards Prerequisites Before you begin, ensure you have met the following requirements: You have ins

Younes™ 35 Dec 14, 2022
Tools for downloading and processing numerical weather predictions

NWP Tools for downloading and processing numerical weather predictions At the moment, this code is focused on downloading historical UKV NWPs produced

Open Climate Fix 6 Nov 24, 2022
Malicious Document IoC Extractor is a collection of scripts that helps extracting IoCs from various maldoc families.

MDIExtractor Malicious Document IoC Extractor (MDIExtractor) is a collection of scripts that helps extracting IoCs from various maldoc families. Prere

Malwrologist 14 Nov 25, 2022
A simple flashcard app built as a final project for a databases class.

CS2300 Final Project - Flashcard app 'FlashStudy' Tech stack Backend Python (Language) Django (Web framework) SQLite (Database) Frontend HTML/CSS/Java

Christopher Spencer 2 Feb 03, 2022
API to summarize input text

summaries API to summarize input text normal run $ docker-compose exec web python -m pytest disable warnings $ docker-compose exec web python -m pytes

Brad 1 Sep 08, 2021
Chat meetup

FLiP-Meetup-Chat Chat meetup create function bin/pulsar-admin functions create --auto-ack true --jar pulsardjlexample-1.0.jar --classname "dev.pulsarf

Timothy Spann 1 Dec 09, 2021
Low-level Python CFFI Bindings for Argon2

Low-level Python CFFI Bindings for Argon2 argon2-cffi-bindings provides low-level CFFI bindings to the Argon2 password hashing algorithm including a v

Hynek Schlawack 4 Dec 15, 2022
Exam assignment for Laboratory of Bioinformatics 2

Exam assignment for Laboratory of Bioinformatics 2 (Alma Mater University of Bologna, Master in Bioinformatics)

2 Oct 22, 2022