An ETL framework + Monitoring UI/API (experimental project for learning purposes)

Related tags

Data Analysisfastlane
Overview

Fastlane

An ETL framework for building pipelines, and Flask based web API/UI for monitoring pipelines.

Project structure

fastlane
|- fastlane: (ETL framework)
|- fastlane_web: (web API/UI for monitoring pipelines)
   |- migrations: (database migrations)
   |- web_api: Flask backend API
   |- web_ui: TBD

Install

  1. Clone the repository
  2. pip install -e .

Example

fastlane --source=mysql --target=s3 --config=examples/mysql_to_athena_example.json

--source: The pipeline's source type (mysql, bigquery, mongodb are only implemented sources so far)

--target: The pipeline's target type (s3, influxdb, mysql, firehose are only implemented targets so far)

--transform: The pipeline's tranform type (default is the only implemented transform so far)

--config: The path to JSON configuration file for the pipeline

--logs_to_slack: Send error logs to slack

--logs_to_cloudwatch: Send logs to cloudwatch

--logs_to_file: Send logs to a file

Extending the framework

The ETL framework has 4 concepts:

Source

The base class fastlane.source.Source provides basic functionality, and defines a standard interface for extracting data from a particular source type. An instance of Source is responsible only for extracting data from source and returning as a python list of dictionaries.

Implementations of the Source base class must fulfill the following functions at minimum:

str: """Return a string describing type of source this is, for example mysql or bigquery""" @classmethod def configuration_schema(cls) -> SourceConfigSchema: """Return a marshmallow schema inherited from SourceConfigSchema base schema. This schema is used to validate the sources configuration, so all possible fields should be covered in schema returned here."""">
from fastlane.source import Source, SourceConfigSchema
import fastlane.utils as utils


class SourceImpl(Source):
    ...

    def extract(self) -> List[dict]:
        """This function should retrieve data from the source and return it as a list of dictionaries.
            The Source class is an iterator, and this function is called on each iteration. 
            The iterator stops (and source worker exits) when this function returns an empty list. 
            So when there are no more records to fetch, this function should return [].
        """

    @utils.classproperty
    def source_type(self) -> str:
        """Return a string describing type of source this is, for example mysql or bigquery"""

    @classmethod
    def configuration_schema(cls) -> SourceConfigSchema:
        """Return a marshmallow schema inherited from SourceConfigSchema base schema.
            This schema is used to validate the sources configuration, so all possible fields should be covered in
            schema returned here."""

Example implementation of Source interface is in fastlane.sources.impl.source_mysql

Implementation Coverage

  • MySQL
  • BigQuery
  • MongoDB

Transform

The base class fastlane.transform.Transform provides basic functionality, and defines a standard interface for transforming data to be ready for target. An instance of Transform is responsible only for transforming data from source into a format compatible with target.

Implementations of the Transform base class must fulfill the following functions at minimum:

str: """Return a string describing type of transform this is.""" @classmethod def configuration_schema(cls) -> TransformConfigSchema: """Return a marshmallow schema inherited from TransformConfigSchema base schema. This schema is used to validate the transforms configuration, so all possible fields should be covered in schema returned here."""">
from fastlane.transform import Transform, TransformConfigSchema
import fastlane.utils as utils


class TransformImpl(Transform):
    ...

    def transform(self, df: pd.DataFrame) -> pd.DataFrame:
        """This function should run any transformation on the dataframe and return the transformed dataframe.
            Ideally the same dataframe should by transformed on in place, but if a new dataframe needs to be created, 
            Make sure to remove the old dataframes from memory.
            This function is called by the transform worker every time a new batch of source data has been received.
        """

    @utils.classproperty
    def transform_type(self) -> str:
        """Return a string describing type of transform this is."""

    @classmethod
    def configuration_schema(cls) -> TransformConfigSchema:
        """Return a marshmallow schema inherited from TransformConfigSchema base schema.
            This schema is used to validate the transforms configuration, so all possible fields should be covered in
            schema returned here."""

Example implementation of Transform interface is in fastlane.transform.impl.transform_default

Target

The base class fastlane.target.Target provides basic functionality, and defines a standard interface for loading data into a destination. An instance of Target is responsible only for storing data which has been transformed into a destination.

Implementations of the Target base class must fulfill the following functions at minimum:

str: """Return a string describing type of target this is.""" @classmethod def target_id(cls, configuration: dict) -> str: """Return a unique identifier from this specific targets configuration. The id should be unique across the whole target destination. For example the target_id for mysql target is built from table and database"""">
from fastlane.target import Target, TargetConfigSchema
import fastlane.utils as utils


class TargetImpl(Target):
    ...

    def load(self, df: pd.DataFrame):
        """This function is called by the target worker every time a new batch of transformed data has been received.
            This function should store the dataframe in whatever destination it implements.
        """

    def get_offset(self):
        """Get the largest key which has been stored in the target. Used from incrementally loaded tables."""

    @utils.classproperty
    def target_type(self) -> str:
        """Return a string describing type of target this is."""

    @classmethod
    def target_id(cls, configuration: dict) -> str:
        """Return a unique identifier from this specific targets configuration. 
            The id should be unique across the whole target destination. 
            For example the target_id for mysql target is built from table and database"""

Example implementation of Target interface is in fastlane.target.impl.target_athena

Implementation Coverage

  • S3
  • InfluxDB
  • MySQL
  • Firehose

Pipeline

The fastlane.pipeline.Pipeline class is what drives the ETL process. It manages the source, transform and target processes, and runs monitoring processes which give insight into the performance/state of the running pipeline.

The Pipeline class works by spawning a number of worker threads for each stage of the ETL process (source, transform, target). Each stage passes work to the next via Queues:

        _________________        Queue       ____________________         Queue        ________________    load
extract | source_worker | -->  [|.|.|.|] -->| transform_worker_1 | -->  [|.|.|.|] --> | target_worker_1 | ------>
------> |_______________|                    --------------------                      ----------------    load
                                         -->| transform_worker_2 |                --> | target_worker_2 | ------>
                                             --------------------                      ----------------    load
                                         -->| transform_worker_3 |                --> | target_worker_3 | ------>
                                             --------------------                      ----------------    load
                                                                                  --> | target_worker_4 | ------>
                                                                                       ----------------

Throughout the ETL process, few small monitoring processes are collecting metrics at periodic intervals such as memory usage, records loaded per second, total records loaded, queue sizes. See fastlane.monitoring.pipeline_monitor for more details on how thats done.

Pipelines Web API

This project includes a Pipeline web API built w Flask which is used as a backend for collecting and storing the metrics from running Pipelines, as well as to serve the Pipeline monitoring web UI.

Resources

CRUD on pipelines

Methods

/api/pipeline

POST
GET
DELETE

/api/pipelines

list pipelines

Methods
GET

/api/pipeline/run

invocation of a particular pipeline

Methods
POST
PUT
GET
DELETE

/api/pipeline/run/latest

latest invocation of a particular pipeline.

Methods
GET

/api/pipeline/run/rps

records per second metrics for a particuar pipeline run.

Methods
GET
POST

/api/pipeline/run/memory_usage

memory usage metrics for a particular pipeline run.

Methods
GET
POST

/api/pipeline/run/logs

logs (from cloudwatch) for a particular pipeline run.

Methods
GET
POST

Pipeline Web UI

Will pvoide a user interface to moniter currently running pipelines, as well as debug and analyze previously invoked pipelines.

Owner
Dan Katz
Seasoned software engineer working in prototyping, architecting, developing and testing full stack applications
Dan Katz
Single-Cell Analysis in Python. Scales to >1M cells.

Scanpy – Single-Cell Analysis in Python Scanpy is a scalable toolkit for analyzing single-cell gene expression data built jointly with anndata. It inc

Theis Lab 1.4k Jan 05, 2023
Bamboolib - a GUI for pandas DataFrames

Community repository of bamboolib bamboolib is joining forces with Databricks. For more information, please read our announcement. Please note that th

Tobias Krabel 863 Jan 08, 2023
A utility for functional piping in Python that allows you to access any function in any scope as a partial.

WithPartial Introduction WithPartial is a simple utility for functional piping in Python. The package exposes a context manager (used with with) calle

Michael Milton 1 Oct 26, 2021
A notebook to analyze Amazon Recommendation Review Dataset.

Amazon Recommendation Review Dataset Analyzer A notebook to analyze Amazon Recommendation Review Dataset. Features Calculates distinct user count, dis

isleki 3 Aug 22, 2022
Data Scientist in Simple Stock Analysis of PT Bukalapak.com Tbk for Long Term Investment

Data Scientist in Simple Stock Analysis of PT Bukalapak.com Tbk for Long Term Investment Brief explanation of PT Bukalapak.com Tbk Bukalapak was found

Najibulloh Asror 2 Feb 10, 2022
Open-source Laplacian Eigenmaps for dimensionality reduction of large data in python.

Fast Laplacian Eigenmaps in python Open-source Laplacian Eigenmaps for dimensionality reduction of large data in python. Comes with an wrapper for NMS

17 Jul 09, 2022
Cleaning and analysing aggregated UK political polling data.

Analysing aggregated UK polling data The tweet collection & storage pipeline used in email-service is used to also collect tweets from @britainelects.

Ajay Pethani 0 Dec 22, 2021
Finds, downloads, parses, and standardizes public bikeshare data into a standard pandas dataframe format

Finds, downloads, parses, and standardizes public bikeshare data into a standard pandas dataframe format.

Brady Law 2 Dec 01, 2021
My solution to the book A Collection of Data Science Take-Home Challenges

DS-Take-Home Solution to the book "A Collection of Data Science Take-Home Challenges". Note: Please don't contact me for the dataset. This repository

Jifu Zhao 1.5k Jan 03, 2023
Using Python to derive insights on particular Pokemon, Types, Generations, and Stats

Pokémon Analysis Andreas Nikolaidis February 2022 Introduction Exploratory Analysis Correlations & Descriptive Statistics Principal Component Analysis

Andreas 1 Feb 18, 2022
Exploring the Top ML and DL GitHub Repositories

This repository contains my work related to my project where I scraped data on the most popular machine learning and deep learning GitHub repositories in order to further visualize and analyze it.

Nico Van den Hooff 17 Aug 21, 2022
Top 50 best selling books on amazon

It's a dashboard that shows the detailed information about each book in the top 50 best selling books on amazon over the last ten years

Nahla Tarek 1 Nov 18, 2021
Anomaly Detection with R

AnomalyDetection R package AnomalyDetection is an open-source R package to detect anomalies which is robust, from a statistical standpoint, in the pre

Twitter 3.5k Dec 27, 2022
Pyspark project that able to do joins on the spark data frames.

SPARK JOINS This project is to perform inner, all outer joins and semi joins. create_df.py: load_data.py : helps to put data into Spark data frames. d

Joshua 1 Dec 14, 2021
Statistical Analysis 📈 focused on statistical analysis and exploration used on various data sets for personal and professional projects.

Statistical Analysis 📈 This repository focuses on statistical analysis and the exploration used on various data sets for personal and professional pr

Andy Pham 1 Sep 03, 2022
For making Tagtog annotation into csv dataset

tagtog_relation_extraction for making Tagtog annotation into csv dataset How to Use On Tagtog 1. Go to Project Downloads 2. Download all documents,

hyeong 4 Dec 28, 2021
bigdata_analyse 大数据分析项目

bigdata_analyse 大数据分析项目 wish 采用不同的技术栈,通过对不同行业的数据集进行分析,期望达到以下目标: 了解不同领域的业务分析指标 深化数据处理、数据分析、数据可视化能力 增加大数据批处理、流处理的实践经验 增加数据挖掘的实践经验

Way 2.4k Dec 30, 2022
My first Python project is a simple Mad Libs program.

Python CLI Mad Libs Game My first Python project is a simple Mad Libs program. Mad Libs is a phrasal template word game created by Leonard Stern and R

Carson Johnson 1 Dec 10, 2021
An ETL framework + Monitoring UI/API (experimental project for learning purposes)

Fastlane An ETL framework for building pipelines, and Flask based web API/UI for monitoring pipelines. Project structure fastlane |- fastlane: (ETL fr

Dan Katz 2 Jan 06, 2022
General Assembly's 2015 Data Science course in Washington, DC

DAT8 Course Repository Course materials for General Assembly's Data Science course in Washington, DC (8/18/15 - 10/29/15). Instructor: Kevin Markham (

Kevin Markham 1.6k Jan 07, 2023