Framework for creating efficient data processing pipelines

Related tags

Miscellaneousaqueduct
Overview

Aqueduct

Framework for creating efficient data processing pipelines.

Contact

Feel free to ask questions in telegram t.me/avito-ml

Key Features

  • Increase RPS (Requests Per Second) for your service
  • All optimisations in one library
  • Uses shared memory for transfer big data between processes

Get started

Simple example how to start with aqueduct using aiohttp. For better examples see examples

web.Application: app = web.Application() app['flow'] = Flow( FlowStep(SumHandler()), ) app.router.add_post('/sum', SumView) app['flow'].start() return app if __name__ == '__main__': web.run_app(prepare_app()) ">
from aiohttp import web
from aqueduct import Flow, FlowStep, BaseTaskHandler, BaseTask


class MyModel:
    """This is CPU bound model example."""
    
    def process(self, number):
        return sum(i * i for i in range(number))

class Task(BaseTask):
    """Container to send arguments to model."""
    def __init__(self, number):
        super().__init__()
        self.number = number
        self.sum = None  # result will be here
    
class SumHandler(BaseTaskHandler):
    """With aqueduct we need to wrap you're model."""
    def __init__(self):
        self._model = None

    def on_start(self):
        """Runs in child process, so memory no memory consumption in parent process."""
        self._model = MyModel()

    def handle(self, *tasks: Task):
        """List of tasks because it can be batching."""
        for task in tasks:
            task.sum = self._model.process(task.number)

            
class SumView(web.View):
    """Simple aiohttp-view handler"""

    async def post(self):
        number = await self.request.read()
        task = Task(int(number))
        await self.request.app['flow'].process(task)
        return web.json_response(data={'result': task.sum})


def prepare_app() -> web.Application:
    app = web.Application()

    app['flow'] = Flow(
        FlowStep(SumHandler()),
    )
    app.router.add_post('/sum', SumView)

    app['flow'].start()
    return app


if __name__ == '__main__':
    web.run_app(prepare_app())
    

Batching

Aqueduct supports the ability to process tasks with batches. Default batch size is one.

np.array: """Always says that there is a cat in the image. The image is represented by a one-dimensional array. The model spends less time for processing batch of images due to GPU optimizations. It's emulated with BATCH_REDUCTION_FACTOR coefficient. """ batch_size = images.shape[0] if batch_size == 1: time.sleep(self.IMAGE_PROCESS_TIME) else: time.sleep(self.IMAGE_PROCESS_TIME * batch_size * self.BATCH_REDUCTION_FACTOR) return np.ones(batch_size, dtype=bool) class CatDetectorHandler(BaseTaskHandler): def handle(self, *tasks: ArrayFieldTask): images = np.array([task.array for task in tasks]) predicts = CatDetector().predict(images) for task, predict in zip(tasks, predicts): task.result = predict def get_tasks_batch(batch_size: int = TASKS_BATCH_SIZE) -> List[BaseTask]: return [ArrayFieldTask(np.array([1, 2, 3])) for _ in range(batch_size)] async def process_tasks(flow: Flow, tasks: List[ArrayFieldTask]): await asyncio.gather(*(flow.process(task) for task in tasks)) tasks_batch = get_tasks_batch() flow_with_batch_handler = Flow(FlowStep(CatDetectorHandler(), batch_size=TASKS_BATCH_SIZE)) flow_with_batch_handler.start() # checks if no one result assert not any(task.result for task in tasks_batch) # task handling takes 0.16 secs that is less than sequential task processing with 0.22 secs await asyncio.wait_for( process_tasks(flow_with_batch_handler, tasks_batch), timeout=CatDetector.BATCH_PROCESS_TIME, ) # checks if all results were set assert all(task.result for task in tasks_batch) await flow_with_batch_handler.stop() # if we have batch size more than tasks number, we can limit batch accumulation time # with timeout parameter for processing time optimization tasks_batch = get_tasks_batch() flow_with_batch_handler = Flow( FlowStep(CatDetectorHandler(), batch_size=2*TASKS_BATCH_SIZE, batch_timeout=0.01) ) flow_with_batch_handler.start() await asyncio.wait_for( process_tasks(flow_with_batch_handler, tasks_batch), timeout=CatDetector.BATCH_PROCESS_TIME + 0.01, ) await flow_with_batch_handler.stop() ">
import asyncio
import time
from typing import List

import numpy as np

from aqueduct.flow import Flow, FlowStep
from aqueduct.handler import BaseTaskHandler
from aqueduct.task import BaseTask

# this constant needs just for example
TASKS_BATCH_SIZE = 20


class ArrayFieldTask(BaseTask):
    def __init__(self, array: np.array, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.array = array
        self.result = None


class CatDetector:
    """GPU model emulator that predicts the presence of the cat in the image."""
    IMAGE_PROCESS_TIME = 0.01
    BATCH_REDUCTION_FACTOR = 0.7
    OVERHEAD_TIME = 0.02
    BATCH_PROCESS_TIME = IMAGE_PROCESS_TIME * TASKS_BATCH_SIZE * BATCH_REDUCTION_FACTOR + OVERHEAD_TIME

    def predict(self, images: np.array) -> np.array:
        """Always says that there is a cat in the image.

        The image is represented by a one-dimensional array.
        The model spends less time for processing batch of images due to GPU optimizations. It's emulated
        with BATCH_REDUCTION_FACTOR coefficient.
        """
        batch_size = images.shape[0]
        if batch_size == 1:
            time.sleep(self.IMAGE_PROCESS_TIME)
        else:
            time.sleep(self.IMAGE_PROCESS_TIME * batch_size * self.BATCH_REDUCTION_FACTOR)
        return np.ones(batch_size, dtype=bool)


class CatDetectorHandler(BaseTaskHandler):
    def handle(self, *tasks: ArrayFieldTask):
        images = np.array([task.array for task in tasks])
        predicts = CatDetector().predict(images)
        for task, predict in zip(tasks, predicts):
            task.result = predict


def get_tasks_batch(batch_size: int = TASKS_BATCH_SIZE) -> List[BaseTask]:
    return [ArrayFieldTask(np.array([1, 2, 3])) for _ in range(batch_size)]


async def process_tasks(flow: Flow, tasks: List[ArrayFieldTask]):
    await asyncio.gather(*(flow.process(task) for task in tasks))


tasks_batch = get_tasks_batch()
flow_with_batch_handler = Flow(FlowStep(CatDetectorHandler(), batch_size=TASKS_BATCH_SIZE))
flow_with_batch_handler.start()

# checks if no one result
assert not any(task.result for task in tasks_batch)
# task handling takes 0.16 secs that is less than sequential task processing with 0.22 secs
await asyncio.wait_for(
    process_tasks(flow_with_batch_handler, tasks_batch), 
    timeout=CatDetector.BATCH_PROCESS_TIME,
)
# checks if all results were set
assert all(task.result for task in tasks_batch)

await flow_with_batch_handler.stop()

# if we have batch size more than tasks number, we can limit batch accumulation time 
# with timeout parameter for processing time optimization
tasks_batch = get_tasks_batch()
flow_with_batch_handler = Flow(
    FlowStep(CatDetectorHandler(), batch_size=2*TASKS_BATCH_SIZE, batch_timeout=0.01)
)
flow_with_batch_handler.start()

await asyncio.wait_for(
    process_tasks(flow_with_batch_handler, tasks_batch), 
    timeout=CatDetector.BATCH_PROCESS_TIME + 0.01,
)

await flow_with_batch_handler.stop()

Sentry

The implementation allows you to receive logger events from the workers and the main process. To integrate with Sentry, you need to write something like this:

import logging
import os

from raven import Client
from raven.handlers.logging import SentryHandler
from raven.transport.http import HTTPTransport

from aqueduct.logger import log


if os.getenv('SENTRY_ENABLED') is True:
    dsn = os.getenv('SENTRY_DSN')
    sentry_handler = SentryHandler(client=Client(dsn=dsn, transport=HTTPTransport), level=logging.ERROR)
    log.addHandler(sentry_handler)
Owner
avito.tech
avito.ru engineering team open source projects
avito.tech
🦋 hundun is a python library for the exploration of chaos.

hundun hundun is a python library for the exploration of chaos. Please note that this library is in beta phase. Example Import the package's equation

kosh 7 Nov 07, 2022
An example using debezium and mysql with docker-compose

debezium-mysql An example using debezium and mysql with docker-compose The docker compose starts the Zookeeper, Kafka, Mysql and Debezium Connect. Aft

Horácio Dias Baptista Neto 4 May 21, 2022
A simple bot that will help you in your learning and make it more fun.

hyperskill-SimpleChattyBot-python A simple bot that will help you in your learning and make it more fun. Syntax bot.py Stages Stage #1: Zuhura Bot we

1 Nov 09, 2021
使用clash核心,对服务器进行Netflix解锁批量测试。

注意事项 测速及解锁测试仅供参考,不代表实际使用情况,由于网络情况变化、Netflix封锁及ip更换,测速具有时效性 本项目使用 Python 编写,使用前请完成环境安装 首次运行前请安装pip及相关依赖,也可使用 pip install -r requirements.txt 命令自行安装 Net

11 Dec 07, 2022
Coderslab Workshop Projects

Workshop Coderslab workshop projects that include: Guessing Game Lotto simulator Guessing Game vol.2 Guessing Game vol.3 Dice 2001 Game Technologies P

Szymon Połczyński 1 Nov 06, 2021
Shopping-card - Shopping Card Project With Python

Shopping Card Project this application was built to handle problems with saving

moein98 1 May 06, 2022
Fixed waypoint(pose) navigation for turtlebot simulation.

Turtlebot-NavigationStack-Fixed-Waypoints fixed waypoint(pose) navigation for turtlebot simulation. Task Details Task Permformed using Navigation Stac

Shanmukha Vishnu 1 Apr 08, 2022
A community-driven python bot that aims to be as simple as possible to serve humans with their everyday tasks

JARVIS on Messenger Just A Rather Very Intelligent System, now on Messenger! Messenger is now used by 1.2 billion people every month. With the launch

Swapnil Agarwal 1.3k Jan 07, 2023
A simple projects to help your seo optimizing has been written with python

python-seo-projects it is a very simple projects to help your seo optimizing has been written with python broken link checker with python(it will give

Amirmohammad Razmy 3 Dec 25, 2021
Track testrail productivity in automated reporting to multiple teams

django_web_app_for_testrail testrail is a test case management tool which helps any organization to track all consumption and testing of manual and au

Vignesh 2 Nov 21, 2021
This program goes thru reddit, finds the most mentioned tickers and uses Vader SentimentIntensityAnalyzer to calculate the ticker compound value.

This program goes thru reddit, finds the most mentioned tickers and uses Vader SentimentIntensityAnalyzer to calculate the ticker compound value.

195 Dec 13, 2022
A collection of examples of using cocotb for functional verification of VHDL designs with GHDL.

At the moment, this repo is in an early state and serves as a learning tool for me. So it contains a a lot of quirks and code which can be done much better by cocotb-professionals.

T. Meissner 7 Mar 10, 2022
Run Windows Applications on Linux as if they are native, Use linux applications to launch files files located in windows vm without needing to install applications on vm. With easy to use configuration GUI

Run Windows Applications on Linux as if they are native, Use linux applications to launch files files located in windows vm without needing to install applications on vm. With easy to use configurati

Casu Al Snek 2k Jan 02, 2023
This is a simple web interface for SimplyTranslate

SimplyTranslate Web This is a simple web interface for SimplyTranslate List of Instances You can find a list of instances here: SimplyTranslate Projec

4 Dec 14, 2022
This is the repo for Uncertainty Quantification 360 Toolkit.

UQ360 The Uncertainty Quantification 360 (UQ360) toolkit is an open-source Python package that provides a diverse set of algorithms to quantify uncert

International Business Machines 207 Dec 30, 2022
Python Control Systems Library

The Python Control Systems Library is a Python module that implements basic operations for analysis and design of feedback control systems.

Control Systems Library for Python 1.3k Jan 06, 2023
This is friendlist update tools & old idz clon & follower idz clon etc

This is friendlist update tools & old idz clon & follower idz clon etc

MAHADI HASAN AFRIDI 1 Jan 15, 2022
Markov Chain Composer

Markov Chain Composer Using Markov Chain to represent relationships between words in song lyrics and then generating new lyrics.. ahem interpretive po

Kylie 85 Dec 09, 2022
Un script en python qui permet d'automatique bumpée (disboard.org) tout les 2h

auto-bumper Un script en python qui permet d'automatique bumpée (disboard.org) tout les 2h Pour la première utilisation, 1.Lancer Install.bat 2.(faire

!! 1 Jan 09, 2022
Manually Install Python 2.7 pip without any problem !

Python2.7_install_pip Manually Install Python 2.7 pip without any problem ! Download installPip.py to your system and Run the code using this Command

Ali Jafari 1 Dec 09, 2021