Framework for creating efficient data processing pipelines

Related tags

Miscellaneousaqueduct
Overview

Aqueduct

Framework for creating efficient data processing pipelines.

Contact

Feel free to ask questions in telegram t.me/avito-ml

Key Features

  • Increase RPS (Requests Per Second) for your service
  • All optimisations in one library
  • Uses shared memory for transfer big data between processes

Get started

Simple example how to start with aqueduct using aiohttp. For better examples see examples

web.Application: app = web.Application() app['flow'] = Flow( FlowStep(SumHandler()), ) app.router.add_post('/sum', SumView) app['flow'].start() return app if __name__ == '__main__': web.run_app(prepare_app()) ">
from aiohttp import web
from aqueduct import Flow, FlowStep, BaseTaskHandler, BaseTask


class MyModel:
    """This is CPU bound model example."""
    
    def process(self, number):
        return sum(i * i for i in range(number))

class Task(BaseTask):
    """Container to send arguments to model."""
    def __init__(self, number):
        super().__init__()
        self.number = number
        self.sum = None  # result will be here
    
class SumHandler(BaseTaskHandler):
    """With aqueduct we need to wrap you're model."""
    def __init__(self):
        self._model = None

    def on_start(self):
        """Runs in child process, so memory no memory consumption in parent process."""
        self._model = MyModel()

    def handle(self, *tasks: Task):
        """List of tasks because it can be batching."""
        for task in tasks:
            task.sum = self._model.process(task.number)

            
class SumView(web.View):
    """Simple aiohttp-view handler"""

    async def post(self):
        number = await self.request.read()
        task = Task(int(number))
        await self.request.app['flow'].process(task)
        return web.json_response(data={'result': task.sum})


def prepare_app() -> web.Application:
    app = web.Application()

    app['flow'] = Flow(
        FlowStep(SumHandler()),
    )
    app.router.add_post('/sum', SumView)

    app['flow'].start()
    return app


if __name__ == '__main__':
    web.run_app(prepare_app())
    

Batching

Aqueduct supports the ability to process tasks with batches. Default batch size is one.

np.array: """Always says that there is a cat in the image. The image is represented by a one-dimensional array. The model spends less time for processing batch of images due to GPU optimizations. It's emulated with BATCH_REDUCTION_FACTOR coefficient. """ batch_size = images.shape[0] if batch_size == 1: time.sleep(self.IMAGE_PROCESS_TIME) else: time.sleep(self.IMAGE_PROCESS_TIME * batch_size * self.BATCH_REDUCTION_FACTOR) return np.ones(batch_size, dtype=bool) class CatDetectorHandler(BaseTaskHandler): def handle(self, *tasks: ArrayFieldTask): images = np.array([task.array for task in tasks]) predicts = CatDetector().predict(images) for task, predict in zip(tasks, predicts): task.result = predict def get_tasks_batch(batch_size: int = TASKS_BATCH_SIZE) -> List[BaseTask]: return [ArrayFieldTask(np.array([1, 2, 3])) for _ in range(batch_size)] async def process_tasks(flow: Flow, tasks: List[ArrayFieldTask]): await asyncio.gather(*(flow.process(task) for task in tasks)) tasks_batch = get_tasks_batch() flow_with_batch_handler = Flow(FlowStep(CatDetectorHandler(), batch_size=TASKS_BATCH_SIZE)) flow_with_batch_handler.start() # checks if no one result assert not any(task.result for task in tasks_batch) # task handling takes 0.16 secs that is less than sequential task processing with 0.22 secs await asyncio.wait_for( process_tasks(flow_with_batch_handler, tasks_batch), timeout=CatDetector.BATCH_PROCESS_TIME, ) # checks if all results were set assert all(task.result for task in tasks_batch) await flow_with_batch_handler.stop() # if we have batch size more than tasks number, we can limit batch accumulation time # with timeout parameter for processing time optimization tasks_batch = get_tasks_batch() flow_with_batch_handler = Flow( FlowStep(CatDetectorHandler(), batch_size=2*TASKS_BATCH_SIZE, batch_timeout=0.01) ) flow_with_batch_handler.start() await asyncio.wait_for( process_tasks(flow_with_batch_handler, tasks_batch), timeout=CatDetector.BATCH_PROCESS_TIME + 0.01, ) await flow_with_batch_handler.stop() ">
import asyncio
import time
from typing import List

import numpy as np

from aqueduct.flow import Flow, FlowStep
from aqueduct.handler import BaseTaskHandler
from aqueduct.task import BaseTask

# this constant needs just for example
TASKS_BATCH_SIZE = 20


class ArrayFieldTask(BaseTask):
    def __init__(self, array: np.array, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.array = array
        self.result = None


class CatDetector:
    """GPU model emulator that predicts the presence of the cat in the image."""
    IMAGE_PROCESS_TIME = 0.01
    BATCH_REDUCTION_FACTOR = 0.7
    OVERHEAD_TIME = 0.02
    BATCH_PROCESS_TIME = IMAGE_PROCESS_TIME * TASKS_BATCH_SIZE * BATCH_REDUCTION_FACTOR + OVERHEAD_TIME

    def predict(self, images: np.array) -> np.array:
        """Always says that there is a cat in the image.

        The image is represented by a one-dimensional array.
        The model spends less time for processing batch of images due to GPU optimizations. It's emulated
        with BATCH_REDUCTION_FACTOR coefficient.
        """
        batch_size = images.shape[0]
        if batch_size == 1:
            time.sleep(self.IMAGE_PROCESS_TIME)
        else:
            time.sleep(self.IMAGE_PROCESS_TIME * batch_size * self.BATCH_REDUCTION_FACTOR)
        return np.ones(batch_size, dtype=bool)


class CatDetectorHandler(BaseTaskHandler):
    def handle(self, *tasks: ArrayFieldTask):
        images = np.array([task.array for task in tasks])
        predicts = CatDetector().predict(images)
        for task, predict in zip(tasks, predicts):
            task.result = predict


def get_tasks_batch(batch_size: int = TASKS_BATCH_SIZE) -> List[BaseTask]:
    return [ArrayFieldTask(np.array([1, 2, 3])) for _ in range(batch_size)]


async def process_tasks(flow: Flow, tasks: List[ArrayFieldTask]):
    await asyncio.gather(*(flow.process(task) for task in tasks))


tasks_batch = get_tasks_batch()
flow_with_batch_handler = Flow(FlowStep(CatDetectorHandler(), batch_size=TASKS_BATCH_SIZE))
flow_with_batch_handler.start()

# checks if no one result
assert not any(task.result for task in tasks_batch)
# task handling takes 0.16 secs that is less than sequential task processing with 0.22 secs
await asyncio.wait_for(
    process_tasks(flow_with_batch_handler, tasks_batch), 
    timeout=CatDetector.BATCH_PROCESS_TIME,
)
# checks if all results were set
assert all(task.result for task in tasks_batch)

await flow_with_batch_handler.stop()

# if we have batch size more than tasks number, we can limit batch accumulation time 
# with timeout parameter for processing time optimization
tasks_batch = get_tasks_batch()
flow_with_batch_handler = Flow(
    FlowStep(CatDetectorHandler(), batch_size=2*TASKS_BATCH_SIZE, batch_timeout=0.01)
)
flow_with_batch_handler.start()

await asyncio.wait_for(
    process_tasks(flow_with_batch_handler, tasks_batch), 
    timeout=CatDetector.BATCH_PROCESS_TIME + 0.01,
)

await flow_with_batch_handler.stop()

Sentry

The implementation allows you to receive logger events from the workers and the main process. To integrate with Sentry, you need to write something like this:

import logging
import os

from raven import Client
from raven.handlers.logging import SentryHandler
from raven.transport.http import HTTPTransport

from aqueduct.logger import log


if os.getenv('SENTRY_ENABLED') is True:
    dsn = os.getenv('SENTRY_DSN')
    sentry_handler = SentryHandler(client=Client(dsn=dsn, transport=HTTPTransport), level=logging.ERROR)
    log.addHandler(sentry_handler)
Owner
avito.tech
avito.ru engineering team open source projects
avito.tech
BlueBorne Dockerized

BlueBorne Dockerized This is the repo to reproduce the BlueBorne kill-chain on Dockerized Android as described here, to fully understand the code you

SecSI 5 Sep 14, 2022
An easy FASTA object handler, reader, writer and translator for small to medium size projects without dependencies.

miniFASTA An easy FASTA object handler, reader, writer and translator for small to medium size projects without dependencies. Installation Using pip /

Jules Kreuer 3 Jun 30, 2022
💻 Algo-Phantoms-Backend is an Application that provides pathways and quizzes along with a code editor to help you towards your DSA journey.📰🔥 This repository contains the REST APIs of the application.✨

Algo-Phantom-Backend 💻 Algo-Phantoms-Backend is an Application that provides pathways and quizzes along with a code editor to help you towards your D

Algo Phantoms 44 Nov 15, 2022
プレヤフHackUチーム「キャット・タン」が作成したアプリ「illustection」

cat_tongue_illustection プレヤフHackUチーム「キャット・タン」が作成した, プライバシー保護アプリ「illustection」です! デモ動画 https://youtu.be/z3I7LuB_i58 機能 アップロードされた画像をいい感じのイラストやの素材に置き換える(

4 Jul 03, 2021
A python script to decrypt media files encrypted using the Android application 'Secret Calculator Photo Vault'. Supports brute force of PIN also.

A python script to decrypt media files encrypted using the Android application 'Secret Calculator Photo Vault'. Supports brute force of PIN also.

3 May 01, 2022
This tool helps you to reverse any regex and gives you the opposite/allowed Letters,numerics and symbols.

Regex-Reverser This tool helps you to reverse any regex and gives you the opposite/allowed Letters,numerics and symbols. Screenshots Usage/Examples py

x19 0 Jun 02, 2022
TB Set color display - Add-on for Blender to set multiple objects and material Display Color at once.

TB_Set_color_display Add-on for Blender with operations to transfer name between object, data, materials and action names Set groups of object's or ma

1 Jun 01, 2022
A software dedicated to automaticaly select the agent of your desire in Valorant

AUTOPICKER A software dedicated to automaticaly select the agent of your desire in Valorant GUIDE Before stariting to use this program check if you ha

p1n00 0 Sep 24, 2022
Acesse seus investimentos da NuInvest pelo Python (Experimental)

Acesse seus investimentos da NuInvest pelo Python (Experimental)

André Roggeri Campos 5 Dec 06, 2022
Cup Noodle Vending Maching Ordering Queue

Noodle-API Cup Noodle Vending Machine Ordering Queue Install dependencies in virtual environment python3 -m venv

Jonas Kazlauskas 1 Dec 09, 2021
Basic infrastructure for writing scripts in Python

Base Script Python is an excellent language that makes writing scripts very straightforward. Over the course of writing many scripts, we realized that

Deep Compute, LLC 9 Jan 07, 2023
A simple script for generating screenshots with Vapoursynth

Vapoursynth-Screenshots A simple script for generating screenshots with Vapoursynth. About I'm lazy, and hate changing variables for each batch of scr

7 Dec 31, 2022
Automatically give thanks to Pypi packages you use in your project!

Automatically give thanks to Pypi packages you use in your project!

Ward 25 Dec 20, 2021
AminoAutoRegFxck/AutoReg For AminoApps.com

AminoAutoRegFxck AminoAutoRegFxck/AutoReg For AminoApps.com Termux apt update -y apt upgrade -y pkg install python git clone https://github.com/LilZev

3 Jan 18, 2022
Module to align code with thoughts of users and designers. Also magically handles navigation and permissions.

This readme will introduce you to Carteblanche and walk you through an example app, please refer to carteblanche-django-starter for the full example p

Eric Neuman 42 May 28, 2021
This module extends twarc to allow you to print out tweets as text for easy testing on the command line

twarc-text This module extends twarc to allow you to print out tweets as text for easy testing on the command line. Maybe it's useful for spot checkin

Documenting the Now 2 Oct 12, 2021
This repository holds those infrastructure-level modules, that every application requires that follows the core 12-factor principles.

py-12f-common About This repository holds those infrastructure-level modules, that every application requires that follows the core 12-factor principl

Tamás Benke 1 Dec 15, 2022
A Python Perforce package that doesn't bring in any other packages to work.

P4CMD 🌴 A Python Perforce package that doesn't bring in any other packages to work. Relies on p4cli installed on the system. p4cmd The p4cmd module h

Niels Vaes 13 Dec 19, 2022
Create N Share is a No Code solution which gives users the ability to create any type of feature rich survey forms with ease.

create n share Note : The Project Scaffold will be pushed soon. Create N Share is a No Code solution which gives users the ability to create any type

Chiraag Kakar 11 Dec 03, 2022
This project is about for notifying moderators about uploaded photos on server.

This project is about for notifying moderators (people who moderate data from photos) about uploaded photos on server.

1 Nov 24, 2021