Framework for creating efficient data processing pipelines

Related tags

Miscellaneousaqueduct
Overview

Aqueduct

Framework for creating efficient data processing pipelines.

Contact

Feel free to ask questions in telegram t.me/avito-ml

Key Features

  • Increase RPS (Requests Per Second) for your service
  • All optimisations in one library
  • Uses shared memory for transfer big data between processes

Get started

Simple example how to start with aqueduct using aiohttp. For better examples see examples

web.Application: app = web.Application() app['flow'] = Flow( FlowStep(SumHandler()), ) app.router.add_post('/sum', SumView) app['flow'].start() return app if __name__ == '__main__': web.run_app(prepare_app()) ">
from aiohttp import web
from aqueduct import Flow, FlowStep, BaseTaskHandler, BaseTask


class MyModel:
    """This is CPU bound model example."""
    
    def process(self, number):
        return sum(i * i for i in range(number))

class Task(BaseTask):
    """Container to send arguments to model."""
    def __init__(self, number):
        super().__init__()
        self.number = number
        self.sum = None  # result will be here
    
class SumHandler(BaseTaskHandler):
    """With aqueduct we need to wrap you're model."""
    def __init__(self):
        self._model = None

    def on_start(self):
        """Runs in child process, so memory no memory consumption in parent process."""
        self._model = MyModel()

    def handle(self, *tasks: Task):
        """List of tasks because it can be batching."""
        for task in tasks:
            task.sum = self._model.process(task.number)

            
class SumView(web.View):
    """Simple aiohttp-view handler"""

    async def post(self):
        number = await self.request.read()
        task = Task(int(number))
        await self.request.app['flow'].process(task)
        return web.json_response(data={'result': task.sum})


def prepare_app() -> web.Application:
    app = web.Application()

    app['flow'] = Flow(
        FlowStep(SumHandler()),
    )
    app.router.add_post('/sum', SumView)

    app['flow'].start()
    return app


if __name__ == '__main__':
    web.run_app(prepare_app())
    

Batching

Aqueduct supports the ability to process tasks with batches. Default batch size is one.

np.array: """Always says that there is a cat in the image. The image is represented by a one-dimensional array. The model spends less time for processing batch of images due to GPU optimizations. It's emulated with BATCH_REDUCTION_FACTOR coefficient. """ batch_size = images.shape[0] if batch_size == 1: time.sleep(self.IMAGE_PROCESS_TIME) else: time.sleep(self.IMAGE_PROCESS_TIME * batch_size * self.BATCH_REDUCTION_FACTOR) return np.ones(batch_size, dtype=bool) class CatDetectorHandler(BaseTaskHandler): def handle(self, *tasks: ArrayFieldTask): images = np.array([task.array for task in tasks]) predicts = CatDetector().predict(images) for task, predict in zip(tasks, predicts): task.result = predict def get_tasks_batch(batch_size: int = TASKS_BATCH_SIZE) -> List[BaseTask]: return [ArrayFieldTask(np.array([1, 2, 3])) for _ in range(batch_size)] async def process_tasks(flow: Flow, tasks: List[ArrayFieldTask]): await asyncio.gather(*(flow.process(task) for task in tasks)) tasks_batch = get_tasks_batch() flow_with_batch_handler = Flow(FlowStep(CatDetectorHandler(), batch_size=TASKS_BATCH_SIZE)) flow_with_batch_handler.start() # checks if no one result assert not any(task.result for task in tasks_batch) # task handling takes 0.16 secs that is less than sequential task processing with 0.22 secs await asyncio.wait_for( process_tasks(flow_with_batch_handler, tasks_batch), timeout=CatDetector.BATCH_PROCESS_TIME, ) # checks if all results were set assert all(task.result for task in tasks_batch) await flow_with_batch_handler.stop() # if we have batch size more than tasks number, we can limit batch accumulation time # with timeout parameter for processing time optimization tasks_batch = get_tasks_batch() flow_with_batch_handler = Flow( FlowStep(CatDetectorHandler(), batch_size=2*TASKS_BATCH_SIZE, batch_timeout=0.01) ) flow_with_batch_handler.start() await asyncio.wait_for( process_tasks(flow_with_batch_handler, tasks_batch), timeout=CatDetector.BATCH_PROCESS_TIME + 0.01, ) await flow_with_batch_handler.stop() ">
import asyncio
import time
from typing import List

import numpy as np

from aqueduct.flow import Flow, FlowStep
from aqueduct.handler import BaseTaskHandler
from aqueduct.task import BaseTask

# this constant needs just for example
TASKS_BATCH_SIZE = 20


class ArrayFieldTask(BaseTask):
    def __init__(self, array: np.array, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.array = array
        self.result = None


class CatDetector:
    """GPU model emulator that predicts the presence of the cat in the image."""
    IMAGE_PROCESS_TIME = 0.01
    BATCH_REDUCTION_FACTOR = 0.7
    OVERHEAD_TIME = 0.02
    BATCH_PROCESS_TIME = IMAGE_PROCESS_TIME * TASKS_BATCH_SIZE * BATCH_REDUCTION_FACTOR + OVERHEAD_TIME

    def predict(self, images: np.array) -> np.array:
        """Always says that there is a cat in the image.

        The image is represented by a one-dimensional array.
        The model spends less time for processing batch of images due to GPU optimizations. It's emulated
        with BATCH_REDUCTION_FACTOR coefficient.
        """
        batch_size = images.shape[0]
        if batch_size == 1:
            time.sleep(self.IMAGE_PROCESS_TIME)
        else:
            time.sleep(self.IMAGE_PROCESS_TIME * batch_size * self.BATCH_REDUCTION_FACTOR)
        return np.ones(batch_size, dtype=bool)


class CatDetectorHandler(BaseTaskHandler):
    def handle(self, *tasks: ArrayFieldTask):
        images = np.array([task.array for task in tasks])
        predicts = CatDetector().predict(images)
        for task, predict in zip(tasks, predicts):
            task.result = predict


def get_tasks_batch(batch_size: int = TASKS_BATCH_SIZE) -> List[BaseTask]:
    return [ArrayFieldTask(np.array([1, 2, 3])) for _ in range(batch_size)]


async def process_tasks(flow: Flow, tasks: List[ArrayFieldTask]):
    await asyncio.gather(*(flow.process(task) for task in tasks))


tasks_batch = get_tasks_batch()
flow_with_batch_handler = Flow(FlowStep(CatDetectorHandler(), batch_size=TASKS_BATCH_SIZE))
flow_with_batch_handler.start()

# checks if no one result
assert not any(task.result for task in tasks_batch)
# task handling takes 0.16 secs that is less than sequential task processing with 0.22 secs
await asyncio.wait_for(
    process_tasks(flow_with_batch_handler, tasks_batch), 
    timeout=CatDetector.BATCH_PROCESS_TIME,
)
# checks if all results were set
assert all(task.result for task in tasks_batch)

await flow_with_batch_handler.stop()

# if we have batch size more than tasks number, we can limit batch accumulation time 
# with timeout parameter for processing time optimization
tasks_batch = get_tasks_batch()
flow_with_batch_handler = Flow(
    FlowStep(CatDetectorHandler(), batch_size=2*TASKS_BATCH_SIZE, batch_timeout=0.01)
)
flow_with_batch_handler.start()

await asyncio.wait_for(
    process_tasks(flow_with_batch_handler, tasks_batch), 
    timeout=CatDetector.BATCH_PROCESS_TIME + 0.01,
)

await flow_with_batch_handler.stop()

Sentry

The implementation allows you to receive logger events from the workers and the main process. To integrate with Sentry, you need to write something like this:

import logging
import os

from raven import Client
from raven.handlers.logging import SentryHandler
from raven.transport.http import HTTPTransport

from aqueduct.logger import log


if os.getenv('SENTRY_ENABLED') is True:
    dsn = os.getenv('SENTRY_DSN')
    sentry_handler = SentryHandler(client=Client(dsn=dsn, transport=HTTPTransport), level=logging.ERROR)
    log.addHandler(sentry_handler)
Owner
avito.tech
avito.ru engineering team open source projects
avito.tech
How to create the game Rock, Paper, Scissors in Python

Rock, Paper, Scissors! If you want to learn how to do interactive games using Python, then this is great start for you. In this code, You will learn h

SplendidSpidey 1 Dec 18, 2021
This is an online course where you can learn and master the skill of low-level performance analysis and tuning.

Performance Ninja Class This is an online course where you can learn to find and fix low-level performance issues, for example CPU cache misses and br

Denis Bakhvalov 1.2k Dec 30, 2022
Google Scholar App Using Python

Google Scholar App Watch the tutorial video How to build a Google Scholar App | Streamlit #30 Demo Launch the web app: Reproducing this web app To rec

Chanin Nantasenamat 4 Jun 05, 2022
Automatically deletes Capital One Eno virtual cards for when you've made a couple too many.

eno-delete Automatically deletes Capital One Eno virtual cards for when you've made a couple too many. Warning: Program will delete ALL virtual cards

h3x 3 Sep 29, 2022
Manjaro CN Repository

Manjaro CN Repository Automatically built packages based on archlinuxcn/repo and manjarocn/docker. Install Add manjarocn to /etc/pacman.conf: Please m

Manjaro CN 28 Jun 26, 2022
This is a Docker-based pipeline for preparing sextractor-ready multiwavelength images

Pipeline for creating NB422-detected (ODI) catalog The repository contains a Docker-based pipeline for preprocessing observational data. The pipeline

1 Sep 01, 2022
This simple script generates a backup of a given Python and R environment

Python Environment Backup It’s always good to maintain your Python and R Anaconda environment packages properly listed and well-kept in case you have

Andrew Laganaro 1 Jul 13, 2022
Write complicated anonymous functions other than lambdas in Python.

lambdex allows you to write multi-line anonymous function expression (called a lambdex) in an idiomatic manner.

Xie Jingyi 71 May 19, 2022
Automates the fixing of problems reported by yamllint by parsing its output

yamlfixer yamlfixer automates the fixing of problems reported by yamllint by parsing its output. Usage This software automatically fixes some errors a

OPT Nouvelle Caledonie 26 Dec 26, 2022
This repository collects nice scripts ("plugins") for the SimpleBot bot for DeltaChat.

Having fun with DeltaChat This repository collects nice scripts ("plugins") for the SimpleBot bot for DeltaChat. DeltaChat is a nice e-mail based mess

Valentin Brandner 3 Dec 25, 2021
Stopmagic gives you the power of creating amazing Stop Motion animations faster and easier than ever before.

Stopmagic gives you the power of creating amazing Stop Motion animations faster and easier than ever before. This project is maintained by Aldrin Mathew.

Aldrin's Art Factory 67 Dec 31, 2022
Gmvault: Backup and restore your gmail account

Gmvault: Backup and restore your gmail account Gmvault is a tool for backing up your gmail account and never lose email correspondence. Gmvault is ope

Guillaume Aubert 3.5k Jan 01, 2023
Capture screen and download off Roku based devices

rokuview Capture screen and download off Roku based devices Tested on Hisense TV with Roku OS built-in No guarantee this will work with all Roku model

3 May 27, 2021
A simple script written using symbolic python that takes as input a desired metric and automatically calculates and outputs the Christoffel Pseudo-Tensor, Riemann Curvature Tensor, Ricci Tensor, Scalar Curvature and the Kretschmann Scalar

A simple script written using symbolic python that takes as input a desired metric and automatically calculates and outputs the Christoffel Pseudo-Tensor, Riemann Curvature Tensor, Ricci Tensor, Scal

2 Nov 27, 2021
A Dungeon and Dragons Toolkit using Python

Pythons-Dungeons A Dungeon and Dragons Toolkit using Python Rules: -When you are commiting please don't delete parts of the code that are important -A

2 Oct 21, 2021
calculadora financiera hecha en python

Calculadora financiera Calculadora de factores financieros basicos, puede calcular tanto factores como expresiones algebraicas en funcion de dichos fa

crudo 5 Nov 10, 2021
A collection of examples of using cocotb for functional verification of VHDL designs with GHDL.

At the moment, this repo is in an early state and serves as a learning tool for me. So it contains a a lot of quirks and code which can be done much better by cocotb-professionals.

T. Meissner 7 Mar 10, 2022
Ice Skating Simulator for Winter and Christmas [yay]

Ice Skating Simulator for Winter and Christmas [yay]

1 Aug 21, 2022
Streamlit β€” The fastest way to build data apps in Python

Welcome to Streamlit πŸ‘‹ The fastest way to build and share data apps. Streamlit lets you turn data scripts into sharable web apps in minutes, not week

Streamlit 22k Jan 06, 2023
GitHub saver for stargazers, forks, repos

GitHub backup repositories Save your repos and list of stargazers & list of forks for them. Pure python3 and git with no dependencies to install. GitH

Alexander Kapitanov 23 Aug 21, 2022