PipeChain is a utility library for creating functional pipelines.

Overview

PipeChain

Motivation

PipeChain is a utility library for creating functional pipelines. Let's start with a motivating example. We have a list of Australian phone numbers from our users. We need to clean this data before we insert it into the database. With PipeChain, you can do this whole process in one neat pipeline:

from pipechain import PipeChain, PLACEHOLDER as _

nums = [
    "493225813",
    "0491 570 156",
    "55505488",
    "Barry",
    "02 5550 7491",
    "491570156",
    "",
    "1800 975 707"
]

PipeChain(
    nums
).pipe(
    # Remove spaces
    map, lambda x: x.replace(" ", ""), _
).pipe(
    # Remove non-numeric entries
    filter, lambda x: x.isnumeric(), _
).pipe(
    # Add the mobile code to the start of 8-digit numbers
    map, lambda x: "04" + x if len(x) == 8 else x, _
).pipe(
    # Add the 0 to the start of 9-digit numbers
    map, lambda x: "0" + x if len(x) == 9 else x, _
).pipe(
    # Convert to a set to remove duplicates
    set
).eval()
{'0255507491', '0455505488', '0491570156', '0493225813', '1800975707'}

Without PipeChain, we would have to horrifically nest our code, or else use a lot of temporary variables:

set(
    map(
        lambda x: "0" + x if len(x) == 9 else x,
        map(
            lambda x: "04" + x if len(x) == 8 else x,
            filter(
                lambda x: x.isnumeric(),
                map(
                    lambda x: x.replace(" ", ""),
                    nums
                )
            )
        )
    )
)
{'0255507491', '0455505488', '0491570156', '0493225813', '1800975707'}

Installation

pip install pipechain

Usage

Basic Usage

PipeChain has only two exports: PipeChain, and PLACEHOLDER.

PipeChain is a class that defines a pipeline. You create an instance of the class, and then call .pipe() to add another function onto the pipeline:

from pipechain import PipeChain, PLACEHOLDER
PipeChain(1).pipe(str)
PipeChain(arg=1, pipes=[functools.partial(
   
    )])

   

Finally, you call .eval() to run the pipeline and return the result:

PipeChain(1).pipe(str).eval()
'1'

You can "feed" the pipe at either end, either during construction (PipeChain("foo")), or during evaluation .eval("foo"):

PipeChain().pipe(str).eval(1)
'1'

Each call to .pipe() takes a function, and any additional arguments you provide, both positional and keyword, will be forwarded to the function:

PipeChain(["b", "a", "c"]).pipe(sorted, reverse=True).eval()
['c', 'b', 'a']

Argument Position

By default, the previous value is passed as the first positional argument to the function:

PipeChain(2).pipe(pow, 3).eval()
8

The only magic here is that if you use the PLACEHOLDER variable as an argument to .pipe(), then the pipeline will replace it with the output of the previous pipe at runtime:

PipeChain(2).pipe(pow, 3, PLACEHOLDER).eval()
9

Note that you can rename PLACEHOLDER to something more usable using Python's import statement, e.g.

from pipechain import PLACEHOLDER as _
PipeChain(2).pipe(pow, 3, _).eval()
9

Methods

It might not see like methods will play that well with this pipe convention, but after all, they are just functions. You should be able to access any object's method as a function by accessing it on that object's parent class. In the below example, str is the parent class of "":

"".join(["a", "b", "c"])
'abc'
PipeChain(["a", "b", "c"]).pipe(str.join, "", _).eval()
'abc'

Operators

The same goes for operators, such as +, *, [] etc. We just have to use the operator module in the standard library:

from operator import add, mul, getitem

PipeChain(5).pipe(mul, 3).eval()
15
PipeChain(5).pipe(add, 3).eval()
8
PipeChain(["a", "b", "c"]).pipe(getitem, 1).eval()
'b'

Test Suite

Note, you will need poetry installed.

To run the test suite, use:

git clone https://github.com/multimeric/PipeChain.git
cd PipeChain
poetry install
poetry run pytest test/test.py
Owner
Michael Milton
Michael Milton
Airflow ETL With EKS EFS Sagemaker

Airflow ETL With EKS EFS & Sagemaker (en desarrollo) Diagrama de la solución Imp

1 Feb 14, 2022
Analysiscsv.py for extracting analysis and exporting as CSV

wcc_analysis Lichess page documentation: https://lichess.org/page/world-championships Each WCC has a study, studies are fetched using: https://lichess

32 Apr 25, 2022
Modular analysis tools for neurophysiology data

Neuroanalysis Modular and interactive tools for analysis of neurophysiology data, with emphasis on patch-clamp electrophysiology. Functions for runnin

Allen Institute 5 Dec 22, 2021
Reading streams of Twitter data, save them to Kafka, then process with Kafka Stream API and Spark Streaming

Using Streaming Twitter Data with Kafka and Spark Reading streams of Twitter data, publishing them to Kafka topic, process message using Kafka Stream

Rustam Zokirov 1 Dec 06, 2021
MapReader: A computer vision pipeline for the semantic exploration of maps at scale

MapReader A computer vision pipeline for the semantic exploration of maps at scale MapReader is an end-to-end computer vision (CV) pipeline designed b

Living with Machines 25 Dec 26, 2022
Advanced Pandas Vault — Utilities, Functions and Snippets (by @firmai).

PandasVault ⁠— Advanced Pandas Functions and Code Snippets The only Pandas utility package you would ever need. It has no exotic external dependencies

Derek Snow 374 Jan 07, 2023
Tuplex is a parallel big data processing framework that runs data science pipelines written in Python at the speed of compiled code

Tuplex is a parallel big data processing framework that runs data science pipelines written in Python at the speed of compiled code. Tuplex has similar Python APIs to Apache Spark or Dask, but rather

Tuplex 791 Jan 04, 2023
Supply a wrapper ``StockDataFrame`` based on the ``pandas.DataFrame`` with inline stock statistics/indicators support.

Stock Statistics/Indicators Calculation Helper VERSION: 0.3.2 Introduction Supply a wrapper StockDataFrame based on the pandas.DataFrame with inline s

Cedric Zhuang 1.1k Dec 28, 2022
Py-price-monitoring - A Python price monitor

A Python price monitor This project was focused on Brazil, so the monitoring is

Samuel 1 Jan 04, 2022
Repositori untuk menyimpan material Long Course STMKGxHMGI tentang Geophysical Python for Seismic Data Analysis

Long Course "Geophysical Python for Seismic Data Analysis" Instruktur: Dr.rer.nat. Wiwit Suryanto, M.Si Dipersiapkan oleh: Anang Sahroni Waktu: Sesi 1

Anang Sahroni 0 Dec 04, 2021
PyPSA: Python for Power System Analysis

1 Python for Power System Analysis Contents 1 Python for Power System Analysis 1.1 About 1.2 Documentation 1.3 Functionality 1.4 Example scripts as Ju

758 Dec 30, 2022
PyNHD is a part of HyRiver software stack that is designed to aid in watershed analysis through web services.

A part of HyRiver software stack that provides access to NHD+ V2 data through NLDI and WaterData web services

Taher Chegini 23 Dec 14, 2022
Full automated data pipeline using docker images

Create postgres tables from CSV files This first section is only relate to creating tables from CSV files using postgres container alone. Just one of

1 Nov 21, 2021
Projeto para realizar o RPA Challenge . Utilizando Python e as bibliotecas Selenium e Pandas.

RPA Challenge in Python Projeto para realizar o RPA Challenge (www.rpachallenge.com), utilizando Python. O objetivo deste desafio é criar um fluxo de

Henrique A. Lourenço 1 Apr 12, 2022
This repo contains a simple but effective tool made using python which can be used for quality control in statistical approach.

📈 Statistical Quality Control 📉 This repo contains a simple but effective tool made using python which can be used for quality control in statistica

SasiVatsal 8 Oct 18, 2022
Project under the certification "Data Analysis with Python" on FreeCodeCamp

Sea Level Predictor Assignment You will anaylize a dataset of the global average sea level change since 1880. You will use the data to predict the sea

Bhavya Gopal 3 Jan 31, 2022
MotorcycleParts DataAnalysis python

We work with the accounting department of a company that sells motorcycle parts. The company operates three warehouses in a large metropolitan area.

NASEEM A P 1 Jan 12, 2022
Created covid data pipeline using PySpark and MySQL that collected data stream from API and do some processing and store it into MYSQL database.

Created covid data pipeline using PySpark and MySQL that collected data stream from API and do some processing and store it into MYSQL database.

2 Nov 20, 2021
Universal data analysis tools for atmospheric sciences

U_analysis Universal data analysis tools for atmospheric sciences Script written in python 3. This file defines multiple functions that can be used fo

Luis Ackermann 1 Oct 10, 2021