Esse é o meu primeiro repo tratando de fim a fim, uma pipeline de dados abertos do governo brasileiro relacionado a compras de contrato e cronogramas anuais com spark, em pyspark e SQL!

Overview

Olá!

Esse é o meu primeiro repo tratando de fim a fim, uma pipeline de dados abertos do governo brasileiro relacionado a compras de contrato e cronogramas anuais com spark, em pyspark e SQL!

O código se encontra aqui e o dado pode ser obtido por meio desse link

from pyspark.sql import SparkSession

##################################################### VARIABLES #####################################################

PATH_LANDING_ZONE_CSV = '../datalake/landing/comprasnet-contratos-anual-cronogramas-latest.csv'
PATH_PROCESSING_ZONE = '../datalake/processing'
PATH_CURATED_ZONE = '../datalake/curated'

##################################################### QUERY #########################################################

QUERY = """ 

WITH tmp as (
  SELECT 
    cast(id as integer) as id,
    cast(contrato_id as integer) as contrato_id,
    tipo,
    numero,
    receita_despesa,
    observacao,
    mesref,
    anoref,
    cast(vencimento as date) as vencimento,
    retroativo,
    cast(valor as decimal (10,2)) as valor,
    year(vencimento) as year,
    month(vencimento) as month,
    dayofmonth(vencimento) as day
  FROM 
    df
)
SELECT
  *
FROM 
  tmp
WHERE   
  year = 2021 OR 
  year = 2022
ORDER BY
  year desc

"""

##################################################### SCRIPT #########################################################

def csv_to_parquet(spark, path_csv, path_parquet):
  df = spark.read.option('header', True).csv(path_csv)
  return df.write.mode('overwrite').format('parquet').save(path_parquet)

def create_view(spark, path_parquet):
  df = spark.read.parquet(path_parquet) 
  df.createOrReplaceTempView('df')

def write_curated(spark, path_curated):
 
  df2 = spark.sql(QUERY)
    
  (
      df2
      .orderBy('year', ascending=False)
      .orderBy('month', ascending=False)
      .orderBy('day', ascending=False)
      .write.partitionBy('year','month','day')
      .mode('overwrite')
      .format('parquet')
      .save(path_curated)
  )


if __name__ == "__main__":
  
  spark = (
    SparkSession.builder
    .master("local[*]")
    .getOrCreate()
  )

  spark.sparkContext.setLogLevel("ERROR")
  
  csv_to_parquet(spark, PATH_LANDING_ZONE_CSV, PATH_PROCESSING_ZONE)

  create_view(spark, PATH_PROCESSING_ZONE)
  
  write_curated(spark, PATH_CURATED_ZONE )
  • Basicamente, extraimos os dados para a zona landing, depois, escrevemos o mesmo dado em diferente formato na zona processing, no caso parquet, por se tratar de um formato otimizado e mais leve.
  • Após, criamos uma view do dado recém salvo na zona processing, já em parquet, que otimiza a leitura do spark, aplicamos uma query de transformação que enriquece o schema do dado e seleciona apenas os dados de 2021 e 2022, já pronto para ser consumido.
  • E por fim, escrevemos na zona curated o dado já tratado, enriquecido, particionado por ano, mês e dia e pronto para consumo.

Para rodar o script, basicamente você pode fazer no terminal:

spark-submit etl.py

Você também encontrará o mesmo código e ideia de ETL em notebooks, em versão pyspark ou spark-sql.

Espero que gostem!

Qualquer dúvida, entrar em contato pelo LinkedIn.

:)

Owner
Henrique de Paula
Games e tech!
Henrique de Paula
XManager: A framework for managing machine learning experiments 🧑‍🔬

XManager is a platform for packaging, running and keeping track of machine learning experiments. It currently enables one to launch experiments locally or on Google Cloud Platform (GCP). Interaction

DeepMind 620 Dec 27, 2022
Add built-in support for quaternions to numpy

Quaternions in numpy This Python module adds a quaternion dtype to NumPy. The code was originally based on code by Martin Ling (which he wrote with he

Mike Boyle 531 Dec 28, 2022
K-means clustering is a method used for clustering analysis, especially in data mining and statistics.

K Means Algorithm What is K Means This algorithm is an iterative algorithm that partitions the dataset according to their features into K number of pr

1 Nov 01, 2021
To-Be is a machine learning challenge on CodaLab Platform about Mortality Prediction

To-Be is a machine learning challenge on CodaLab Platform about Mortality Prediction. The challenge aims to adress the problems of medical imbalanced data classification.

Marwan Mashra 1 Jan 31, 2022
Case studies with Bayesian methods

Case studies with Bayesian methods

Baze Petrushev 8 Nov 26, 2022
50% faster, 50% less RAM Machine Learning. Numba rewritten Sklearn. SVD, NNMF, PCA, LinearReg, RidgeReg, Randomized, Truncated SVD/PCA, CSR Matrices all 50+% faster

[Due to the time taken @ uni, work + hell breaking loose in my life, since things have calmed down a bit, will continue commiting!!!] [By the way, I'm

Daniel Han-Chen 1.4k Jan 01, 2023
All-in-one web-based development environment for machine learning

All-in-one web-based development environment for machine learning Getting Started • Features & Screenshots • Support • Report a Bug • FAQ • Known Issu

3 Feb 03, 2021
Forecast dynamically at scale with this unique package. pip install scalecast

🌄 Scalecast: Dynamic Forecasting at Scale About This package uses a scaleable forecasting approach in Python with common scikit-learn and statsmodels

Michael Keith 158 Jan 03, 2023
scikit-learn models hyperparameters tuning and feature selection, using evolutionary algorithms.

Sklearn-genetic-opt scikit-learn models hyperparameters tuning and feature selection, using evolutionary algorithms. This is meant to be an alternativ

Rodrigo Arenas 180 Dec 20, 2022
A simple guide to MLOps through ZenML and its various integrations.

ZenBytes Join our Slack Community and become part of the ZenML family Give the main ZenML repo a GitHub star to show your love ZenBytes is a series of

ZenML 127 Dec 27, 2022
The Ultimate FREE Machine Learning Study Plan

The Ultimate FREE Machine Learning Study Plan

Patrick Loeber (Python Engineer) 2.5k Jan 05, 2023
Module is created to build a spam filter using Python and the multinomial Naive Bayes algorithm.

Naive-Bayes Spam Classificator Module is created to build a spam filter using Python and the multinomial Naive Bayes algorithm. Main goal is to code a

Viktoria Maksymiuk 1 Jun 27, 2022
Lightning ⚡️ fast forecasting with statistical and econometric models.

Nixtla Statistical ⚡️ Forecast Lightning fast forecasting with statistical and econometric models StatsForecast offers a collection of widely used uni

Nixtla 2.1k Dec 29, 2022
A python library for Bayesian time series modeling

PyDLM Welcome to pydlm, a flexible time series modeling library for python. This library is based on the Bayesian dynamic linear model (Harrison and W

Sam 438 Dec 17, 2022
A collection of machine learning examples and tutorials.

machine_learning_examples A collection of machine learning examples and tutorials.

LazyProgrammer.me 7.1k Jan 01, 2023
Backtesting an algorithmic trading strategy using Machine Learning and Sentiment Analysis.

Trading Tesla with Machine Learning and Sentiment Analysis An interactive program to train a Random Forest Classifier to predict Tesla daily prices us

Renato Votto 31 Nov 17, 2022
CS 7301: Spring 2021 Course on Advanced Topics in Optimization in Machine Learning

CS 7301: Spring 2021 Course on Advanced Topics in Optimization in Machine Learning

Rishabh Iyer 141 Nov 10, 2022
Scikit-Garden or skgarden is a garden for Scikit-Learn compatible decision trees and forests.

Scikit-Garden or skgarden (pronounced as skarden) is a garden for Scikit-Learn compatible decision trees and forests.

260 Dec 21, 2022
30 Days Of Machine Learning Using Pytorch

Objective of the repository is to learn and build machine learning models using Pytorch. 30DaysofML Using Pytorch

Mayur 119 Nov 24, 2022
A Streamlit demo to interactively visualize Uber pickups in New York City

Streamlit Demo: Uber Pickups in New York City A Streamlit demo written in pure Python to interactively visualize Uber pickups in New York City. View t

Streamlit 230 Dec 28, 2022