Framework for creating efficient data processing pipelines

Related tags

Miscellaneousaqueduct
Overview

Aqueduct

Framework for creating efficient data processing pipelines.

Contact

Feel free to ask questions in telegram t.me/avito-ml

Key Features

  • Increase RPS (Requests Per Second) for your service
  • All optimisations in one library
  • Uses shared memory for transfer big data between processes

Get started

Simple example how to start with aqueduct using aiohttp. For better examples see examples

web.Application: app = web.Application() app['flow'] = Flow( FlowStep(SumHandler()), ) app.router.add_post('/sum', SumView) app['flow'].start() return app if __name__ == '__main__': web.run_app(prepare_app()) ">
from aiohttp import web
from aqueduct import Flow, FlowStep, BaseTaskHandler, BaseTask


class MyModel:
    """This is CPU bound model example."""
    
    def process(self, number):
        return sum(i * i for i in range(number))

class Task(BaseTask):
    """Container to send arguments to model."""
    def __init__(self, number):
        super().__init__()
        self.number = number
        self.sum = None  # result will be here
    
class SumHandler(BaseTaskHandler):
    """With aqueduct we need to wrap you're model."""
    def __init__(self):
        self._model = None

    def on_start(self):
        """Runs in child process, so memory no memory consumption in parent process."""
        self._model = MyModel()

    def handle(self, *tasks: Task):
        """List of tasks because it can be batching."""
        for task in tasks:
            task.sum = self._model.process(task.number)

            
class SumView(web.View):
    """Simple aiohttp-view handler"""

    async def post(self):
        number = await self.request.read()
        task = Task(int(number))
        await self.request.app['flow'].process(task)
        return web.json_response(data={'result': task.sum})


def prepare_app() -> web.Application:
    app = web.Application()

    app['flow'] = Flow(
        FlowStep(SumHandler()),
    )
    app.router.add_post('/sum', SumView)

    app['flow'].start()
    return app


if __name__ == '__main__':
    web.run_app(prepare_app())
    

Batching

Aqueduct supports the ability to process tasks with batches. Default batch size is one.

np.array: """Always says that there is a cat in the image. The image is represented by a one-dimensional array. The model spends less time for processing batch of images due to GPU optimizations. It's emulated with BATCH_REDUCTION_FACTOR coefficient. """ batch_size = images.shape[0] if batch_size == 1: time.sleep(self.IMAGE_PROCESS_TIME) else: time.sleep(self.IMAGE_PROCESS_TIME * batch_size * self.BATCH_REDUCTION_FACTOR) return np.ones(batch_size, dtype=bool) class CatDetectorHandler(BaseTaskHandler): def handle(self, *tasks: ArrayFieldTask): images = np.array([task.array for task in tasks]) predicts = CatDetector().predict(images) for task, predict in zip(tasks, predicts): task.result = predict def get_tasks_batch(batch_size: int = TASKS_BATCH_SIZE) -> List[BaseTask]: return [ArrayFieldTask(np.array([1, 2, 3])) for _ in range(batch_size)] async def process_tasks(flow: Flow, tasks: List[ArrayFieldTask]): await asyncio.gather(*(flow.process(task) for task in tasks)) tasks_batch = get_tasks_batch() flow_with_batch_handler = Flow(FlowStep(CatDetectorHandler(), batch_size=TASKS_BATCH_SIZE)) flow_with_batch_handler.start() # checks if no one result assert not any(task.result for task in tasks_batch) # task handling takes 0.16 secs that is less than sequential task processing with 0.22 secs await asyncio.wait_for( process_tasks(flow_with_batch_handler, tasks_batch), timeout=CatDetector.BATCH_PROCESS_TIME, ) # checks if all results were set assert all(task.result for task in tasks_batch) await flow_with_batch_handler.stop() # if we have batch size more than tasks number, we can limit batch accumulation time # with timeout parameter for processing time optimization tasks_batch = get_tasks_batch() flow_with_batch_handler = Flow( FlowStep(CatDetectorHandler(), batch_size=2*TASKS_BATCH_SIZE, batch_timeout=0.01) ) flow_with_batch_handler.start() await asyncio.wait_for( process_tasks(flow_with_batch_handler, tasks_batch), timeout=CatDetector.BATCH_PROCESS_TIME + 0.01, ) await flow_with_batch_handler.stop() ">
import asyncio
import time
from typing import List

import numpy as np

from aqueduct.flow import Flow, FlowStep
from aqueduct.handler import BaseTaskHandler
from aqueduct.task import BaseTask

# this constant needs just for example
TASKS_BATCH_SIZE = 20


class ArrayFieldTask(BaseTask):
    def __init__(self, array: np.array, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.array = array
        self.result = None


class CatDetector:
    """GPU model emulator that predicts the presence of the cat in the image."""
    IMAGE_PROCESS_TIME = 0.01
    BATCH_REDUCTION_FACTOR = 0.7
    OVERHEAD_TIME = 0.02
    BATCH_PROCESS_TIME = IMAGE_PROCESS_TIME * TASKS_BATCH_SIZE * BATCH_REDUCTION_FACTOR + OVERHEAD_TIME

    def predict(self, images: np.array) -> np.array:
        """Always says that there is a cat in the image.

        The image is represented by a one-dimensional array.
        The model spends less time for processing batch of images due to GPU optimizations. It's emulated
        with BATCH_REDUCTION_FACTOR coefficient.
        """
        batch_size = images.shape[0]
        if batch_size == 1:
            time.sleep(self.IMAGE_PROCESS_TIME)
        else:
            time.sleep(self.IMAGE_PROCESS_TIME * batch_size * self.BATCH_REDUCTION_FACTOR)
        return np.ones(batch_size, dtype=bool)


class CatDetectorHandler(BaseTaskHandler):
    def handle(self, *tasks: ArrayFieldTask):
        images = np.array([task.array for task in tasks])
        predicts = CatDetector().predict(images)
        for task, predict in zip(tasks, predicts):
            task.result = predict


def get_tasks_batch(batch_size: int = TASKS_BATCH_SIZE) -> List[BaseTask]:
    return [ArrayFieldTask(np.array([1, 2, 3])) for _ in range(batch_size)]


async def process_tasks(flow: Flow, tasks: List[ArrayFieldTask]):
    await asyncio.gather(*(flow.process(task) for task in tasks))


tasks_batch = get_tasks_batch()
flow_with_batch_handler = Flow(FlowStep(CatDetectorHandler(), batch_size=TASKS_BATCH_SIZE))
flow_with_batch_handler.start()

# checks if no one result
assert not any(task.result for task in tasks_batch)
# task handling takes 0.16 secs that is less than sequential task processing with 0.22 secs
await asyncio.wait_for(
    process_tasks(flow_with_batch_handler, tasks_batch), 
    timeout=CatDetector.BATCH_PROCESS_TIME,
)
# checks if all results were set
assert all(task.result for task in tasks_batch)

await flow_with_batch_handler.stop()

# if we have batch size more than tasks number, we can limit batch accumulation time 
# with timeout parameter for processing time optimization
tasks_batch = get_tasks_batch()
flow_with_batch_handler = Flow(
    FlowStep(CatDetectorHandler(), batch_size=2*TASKS_BATCH_SIZE, batch_timeout=0.01)
)
flow_with_batch_handler.start()

await asyncio.wait_for(
    process_tasks(flow_with_batch_handler, tasks_batch), 
    timeout=CatDetector.BATCH_PROCESS_TIME + 0.01,
)

await flow_with_batch_handler.stop()

Sentry

The implementation allows you to receive logger events from the workers and the main process. To integrate with Sentry, you need to write something like this:

import logging
import os

from raven import Client
from raven.handlers.logging import SentryHandler
from raven.transport.http import HTTPTransport

from aqueduct.logger import log


if os.getenv('SENTRY_ENABLED') is True:
    dsn = os.getenv('SENTRY_DSN')
    sentry_handler = SentryHandler(client=Client(dsn=dsn, transport=HTTPTransport), level=logging.ERROR)
    log.addHandler(sentry_handler)
Owner
avito.tech
avito.ru engineering team open source projects
avito.tech
A companion web application to connect stash to deovr

stash-vr-companion This is a companion web application to connect stash to deovr. Stash is a self hosted web application to manage your porn collectio

19 Sep 29, 2022
qecsim is a Python 3 package for simulating quantum error correction using stabilizer codes.

qecsim qecsim is a Python 3 package for simulating quantum error correction using stabilizer codes.

44 Dec 20, 2022
Project aims to map out common user behavior on the computer

User-Behavior-Mapping-Tool Project aims to map out common user behavior on the computer. Most of the code is based on the research by kacos2000 found

trustedsec 136 Dec 23, 2022
A new mini-batch framework for optimal transport in deep generative models, deep domain adaptation, approximate Bayesian computation, color transfer, and gradient flow.

BoMb-OT Python3 implementation of the papers On Transportation of Mini-batches: A Hierarchical Approach and Improving Mini-batch Optimal Transport via

Khai Ba Nguyen 18 Nov 14, 2022
A python script to search for k-uniform Euclidean tilings.

k-uniform-solver A python script to search for k-uniform Euclidean tilings. This project's aim is to replicate and extend the list of k-uniform Euclid

3 Dec 06, 2022
LeetComp - Background tasks powering the static content at LeetComp

LeetComp Analysing compensations mentioned on the Leetcode forums (https://kuuts

Kumar Utsav 125 Dec 21, 2022
Python program that generates random user from API

RandomUserPy Author kirito sate #modules used requests, json, tkinter, PIL, urllib, io, install requests and PIL modules from pypi pip install Pillow

kiritosate 1 Jan 05, 2022
This is the Quiz that I made using Python Programming Language. This can only run in the Terminal

This is the Quiz that I made using Python Programming Language. This can only run in the Terminal

YOSHITHA RATHNAYAKE 1 Apr 08, 2022
contextlib2 is a backport of the standard library's contextlib module to earlier Python versions.

contextlib2 is a backport of the standard library's contextlib module to earlier Python versions. It also sometimes serves as a real world proving gro

Jazzband 35 Dec 23, 2022
Python API for HotBits random data generator

HotBits Python API Python API for HotBits random data generator. Description This project is random data generator. It uses is HotBits API web service

Filip Š 2 Sep 11, 2020
Python communism - A module for initiating the communist revolution in each of our python modules

Python communist revolution A man once said to abolish the classes or something

758 Jan 03, 2023
script to analyze EQ decay using python

pyq_decay script to analyze EQ decay using python PyQ Decay ver 1.0 A pythonic script to analyze EQ aftershock decay using method of Omori (1894), Mog

1 Nov 04, 2021
Todo-backend - Todo backend with python

Todo-backend - Todo backend with python

Julio C. Diaz 1 Jan 07, 2022
Domoticz-hyundai-kia - Domoticz Hyundai-Kia plugin for Domoticz home automation system

Domoticz Hyundai-Kia plugin Author: Creasol https://www.creasol.it/domotics For

Creasol 7 Aug 03, 2022
A faster copy of nell's comet nuker

Astro a faster copy of nell's comet nuker also nell uses external libraries like it's cocaine man never learned to use ansi color codes (ily nell) (On

horrid 8 Aug 15, 2022
Tracking stock volatility.

SP500-highlow-tracking Track stock volatility. Being a useful indicator of the stock price volatility, High-Low gap represents the price range of the

Thong Huynh 13 Sep 07, 2022
PyPI package for scaffolding out code for decision tree models that can learn to find relationships between the attributes of an object.

Decision Tree Writer This package allows you to train a binary classification decision tree on a list of labeled dictionaries or class instances, and

2 Apr 23, 2022
Homed - Light-weight, easily configurable, dockerized homepage

homed GitHub Repo Docker Hub homed is a light-weight customizable portal primari

Matt Walters 12 Dec 15, 2022
ToDo - A simple bot to keep track of things you need to do

ToDo A simple bot to keep track of things you need to do. Installation You will

3 Sep 18, 2022
Programmatic interface to Synapse services for Python

A Python client for Sage Bionetworks' Synapse, a collaborative, open-source research platform that allows teams to share data, track analyses, and collaborate

Sage Bionetworks 54 Dec 23, 2022