Building house price data pipelines with Apache Beam and Spark on GCP

Overview

house-price-etl-pipeline

This project contains the process from building a web crawler to extract the raw data of house price to create ETL pipelines using Google Could Platform services.

Basic flow of the ETL pipeline

The ETL pipelines are built with both Apache Beam using Cloud Dataflow and Spark using Cloud Dataproc for loading real estate transactions data into BigQuery, and the data can be visualized in Data Studio. The project also uses Cloud Function to monitor if a new file is uploaded in the GCS bucket and trigger the pipeline automatically.

1. Get Started

The house price data

Actual price registration of real estate transactions data in Taiwan has been released since 2012, which refers to the transaction information includes: position and area of real estate, total price of land and building, parking space related information, etc. We can use the data to observe the changes in house prices over time or predict the house price trend in various regions.

Setup and requirements

Set up on Google Cloud Platform:

Project is created with:

  • Python version: 3.7
  • Apache beam version: 2.33.0
  • Pyspark version: 3.2.0

2. Use a web crawler to download the historical data

Run the web crawler to download historical actual price data in csv format, and upload the files to the Google Cloud Storage bucket.

First, set up the local Python development environment and install packages from requirements.txt:

$ pip install -r requirements.txt

Open crawler.py file, replace YOUR_DIR_PATH with a local directory to store download data, replace projectID with your Google Cloud project ID, and replace GCS_BUCKET_NAME with the name of your Cloud Storage bucket. Then run the web crawler:

$ python crawler.py

3. Build ETL pipelines on GCP

There are two versions of ETL pipelines that read source files from Cloud Storage, apply some transformations and load the data into BigQuery. One of the ETL pipelines based on Apache beam uses Dataflow to process the data for analytics of land transaction. The other ETL pipeline based on Apache Spark uses Dataproc to proccess the data for analytics of building transaction.

Let’s start by opening a session in Google Cloud Shell. Run the following commands to set the project property with your project ID.

$ gcloud config set project [projectID]

Run the pipeline using Dataflow for land data

The file etl_pipeline_beam.py contains the Python code for the etl pipeline with Apache beam. We can upload the file using the Cloud Shell Editor.

Run actual_price_etl.py to create a Dataflow job which runs the DataflowRunner. Notice that we need to set the Cloud Storage location of the staging and template file, and set the region in which the created job should run.

$ python etl_pipeline_beam.py \
--project=projectID \
--region=region \
--runner=DataflowRunner \
--staging_location=gs://BUCKET_NAME/staging \
--temp_location=gs://BUCKET_NAME/temp \
--save_main_session

Run the pipeline using Dataproc for building data

The file etl_pipeline_spark.py contains the Python code for the etl pipeline with Apache Spark. We can upload the file using the Cloud Shell Editor.

Submit etl_pipeline_spark.py to your Dataproc cluster to run the Spark job. We need to set the cluster name, and set the region in which the created job should run. To write data to Bigquery, the jar file of spark-bigquery-connector must be available at runtime.

$ gcloud dataproc jobs submit pyspark etl_pipeline_spark.py \
--cluster=cluster-name \
--region=region \
--jars=gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar

4. Use a Cloud Function to trigger Cloud Dataflow

Use the Cloud Fucntion to automatically trigger the Dataflow pipeline when a new file arrives in the GCS bucket.

First, we need to create a Dataflow template for runnig the data pipeline with REST API request called by the Cloud Function. The file etl_pipeline_beam_auto.py contains the Python code for the etl pipeline with Apache beam. We can upload the file using the Cloud Shell Editor.

Create a Dataflow template

Use etl_pipeline_beam_auto.py to create a Dataflow template. Note that we need to set the Cloud Storage location of the staging, temporary and template file, and set the region in which the created job should run.

python -m etl_pipeline_beam_auto \
    --runner DataflowRunner \
    --project projectID \
    --region=region \
    --staging_location gs://BUCKET_NAME/staging \
    --temp_location gs://BUCKET_NAME/temp \
    --template_location gs://BUCKET_NAME/template \
    --save_main_session

Create a Cloud Function

Go to the Cloud Function GUI and manually create a function, set Trigger as Cloud Storage, Event Type as Finalize/Create , and choose the GCS bucket which needs to be monitored. Next, write the function itself, use the code in main.py file. Note that the user defined parameter input is passed to the Dataflow pipeline job. Finally, click on depoly and now your function is ready to execute and start the Dataflow pipeline when a file is uploaded in your bucket.

Results

When each ETL pipeline is completed and succeeded, navigating to BigQuery to verify that the data is successfully loaded in the table.

BigQuery - land_data table

Now the data is ready for analytics and reporting. Here, we calculate average price by year in BigQuery, and visualize the results in Data Studio.

Data Studio - Average land price by year in Yilan County

Codes for the collection and predictive processing of bitcoin from the API of coinmarketcap

Codes for the collection and predictive processing of bitcoin from the API of coinmarketcap

Teo Calvo 5 Apr 26, 2022
A probabilistic programming language in TensorFlow. Deep generative models, variational inference.

Edward is a Python library for probabilistic modeling, inference, and criticism. It is a testbed for fast experimentation and research with probabilis

Blei Lab 4.7k Jan 09, 2023
SparseLasso: Sparse Solutions for the Lasso

SparseLasso: Sparse Solutions for the Lasso Introduction SparseLasso provides a Scikit-Learn based estimation of the Lasso with cross-validation tunin

Gabriel Okasa 1 Nov 08, 2021
A computer algebra system written in pure Python

SymPy See the AUTHORS file for the list of authors. And many more people helped on the SymPy mailing list, reported bugs, helped organize SymPy's part

SymPy 9.9k Dec 31, 2022
The lastest all in one bombing tool coded in python uses tbomb api

BaapG-Attack is a python3 based script which is officially made for linux based distro . It is inbuit mass bomber with sms, mail, calls and many more bombing

59 Dec 25, 2022
Analysiscsv.py for extracting analysis and exporting as CSV

wcc_analysis Lichess page documentation: https://lichess.org/page/world-championships Each WCC has a study, studies are fetched using: https://lichess

32 Apr 25, 2022
An experimental project I'm undertaking for the sole purpose of increasing my Python knowledge

5ePy is an experimental project I'm undertaking for the sole purpose of increasing my Python knowledge. #Goals Goal: Create a working, albeit lightwei

Hayden Covington 1 Nov 24, 2021
CS50 pset9: Using flask API to create a web application to exchange stocks' shares.

C$50 Finance In this guide we want to implement a website via which users can “register”, “login” “buy” and “sell” stocks, like below: Background If y

1 Jan 24, 2022
PyNHD is a part of HyRiver software stack that is designed to aid in watershed analysis through web services.

A part of HyRiver software stack that provides access to NHD+ V2 data through NLDI and WaterData web services

Taher Chegini 23 Dec 14, 2022
pyETT: Python library for Eleven VR Table Tennis data

pyETT: Python library for Eleven VR Table Tennis data Documentation Documentation for pyETT is located at https://pyett.readthedocs.io/. Installation

Tharsis Souza 5 Nov 19, 2022
signac-flow - manage workflows with signac

signac-flow - manage workflows with signac The signac framework helps users manage and scale file-based workflows, facilitating data reuse, sharing, a

Glotzer Group 44 Oct 14, 2022
This module is used to create Convolutional AutoEncoders for Variational Data Assimilation

VarDACAE This module is used to create Convolutional AutoEncoders for Variational Data Assimilation. A user can define, create and train an AE for Dat

Julian Mack 23 Dec 16, 2022
yt is an open-source, permissively-licensed Python library for analyzing and visualizing volumetric data.

The yt Project yt is an open-source, permissively-licensed Python library for analyzing and visualizing volumetric data. yt supports structured, varia

The yt project 367 Dec 25, 2022
INF42 - Topological Data Analysis

TDA INF421(Conception et analyse d'algorithmes) Projet : Topological Data Analysis SphereMin Etant donné un nuage des points, ce programme contient de

2 Jan 07, 2022
Provide a market analysis (R)

market-study Provide a market analysis (R) - FRENCH Produisez une étude de marché Prérequis Pour effectuer ce projet, vous devrez maîtriser la manipul

1 Feb 13, 2022
Used for data processing in machine learning, and help us to construct ML model more easily from scratch

Used for data processing in machine learning, and help us to construct ML model more easily from scratch. Can be used in linear model, logistic regression model, and decision tree.

ShawnWang 0 Jul 05, 2022
Processo de ETL (extração, transformação, carregamento) realizado pela equipe no projeto final do curso da Soul Code Academy.

Processo de ETL (extração, transformação, carregamento) realizado pela equipe no projeto final do curso da Soul Code Academy.

Débora Mendes de Azevedo 1 Feb 03, 2022
Statsmodels: statistical modeling and econometrics in Python

About statsmodels statsmodels is a Python package that provides a complement to scipy for statistical computations including descriptive statistics an

statsmodels 8k Dec 29, 2022
Additional tools for particle accelerator data analysis and machine information

PyLHC Tools This package is a collection of useful scripts and tools for the Optics Measurements and Corrections group (OMC) at CERN. Documentation Au

PyLHC 3 Apr 13, 2022