Crypto Stats and Tweets Data Pipeline using Airflow

Overview

Crypto Stats and Tweets Data Pipeline using Airflow

Introduction

Project Overview

This project was brought upon through Udacity's nanodegree program.

For the capstone project within the nanodegree, the ultimate goal is to build a data pipeline that uses the technologies and applications covered in the the program.

With the recent rise of crypto currency interests and the evolution of crypto twitter into the media spotlight, revolving my capstone project around these two areas seemed like a good idea.

The ultimate goal of this project is to create both crypto statistics and crypto tweets datasets that can be used in downstream applications.

That goal was accomplished through this project. However, I have further goals for this project, which will be discussed later.

Project Requirements

At least 2 data sources

  • twitter.com accessed through snscrape tweets libary
  • coingecko public API resulting in crypto currency statistical data starting in 2015.

More than 1 million lines of data.

  • The snscrape_tweets_hist dataset has over 1.5 million rows
  • The coin_stats_hist has over 250k rows.

At least two data sources/formats (csv, api, json)

  • Stored in S3 (mkgpublic)
    • mkgpublic/capstone/tweets/tweets.parquet
    • mkgpublic/capstone/crypto/cg_hourly.csv

Data Ingestion Process

Tweets

The original data ingestion process ran into few snafus. As I decided to use the twitter API to get the tweets side of the data at first; however, due to limitations within the twitter API, I couldn't get more than 1000 tweets per call.

Thus, I decided to use the snscrape tweets python library instead, which provided a much easier method to get a ton of tweets in a reasonable amount of time.

Through using the snscrape tweets python library, the tweets were gathered running a library function.

The tweets were than stored in a MongoDB database as an intermediary storage solution.

Data was continuously ingested using this process until enough tweets about various crypto currencies was gathered.

After storing the tweets in MongoDB the tweets were then pulled from the MongoDB database, stored in a pandas dataframe and written to the mkgpublic s3 bucket as a parquet file.

Crypto

Using the coingecko api, crypto currency statistical data was pulled and stored in a pandas dataframe.

After storing the data in the pandas df, the data was written to the MongoDB database used for tweets.

Data is continously ingested through this process until enough statistical data about various crypto currencies was stored.

Finally the crypto currency statistical data is pulled from the MongoDB database, stored in a pandas dataframe and written to the mkgpublic s3 bucket as a CSV. *** Note *** I stored the data as a CSV because two sets of data formats were requested. I originally choose to store the crypto stats data as a json file, but even when partitioning the file into several JSON files, the files were too big for airflow to handle. Thus, I went with the csv format.

Crypto Stats and Tweets ELT

Now we get into the udacity capstone data ingestion and processing part of this project.

Ultimately, I choose to follow a similar process to what is in the mkg_airflow repository where I am using airflow to run a sequence of tasks.

Main Scripts

  • dags/tweets_and_crypto_etl.py
  • plugins/helpers/sql_queries.py
  • plugins/operators/stage_redshift.py
  • plugins/operators/load_dimension.py
  • plugins/operators/load_fact.py
  • plugins/helpers/analysis.py
  • plugins/operators/data_quality.py

Data Model

Udacity Capstone Project Data Model
  1. Data is loaded into the staging tables cg_coin_list_stg, snscrape_tweets_stg, and cg_hourly_stg on a Redshift Cluster from the S3 bucket
  2. Date information is loaded into Date Dim
  3. Data is loaded into the cg_coin_list table from cg_coin_list_stg
  4. Data is loaded into coin_stats_hist using a join between date_dim, cg_hourly_stg, and cg_coin_list using date_keys and coin names as parameters to get foreign key allocation
  5. Data is loaded into snscrape_tweets_hist using a join between date_dim, snscrape_tweets_stg and cg_coin_list using date_keys and coin names as parameters to get foreign key allocation

Ultimately, this data model was chosen as the end state will be combining crypto price action with tweet sentiment to determine how the market reacts to price action. So, we need a relationship between the crypto and tweets datasets in order to one day achieve this future state result.

Steps

Airflow Udacity Capstone Dag
  1. Create Redshift Cluster
  2. Create Crypto, Tweets, and Dim Schemas
  3. Create Crypto/Tweets staging and Dim Tables
  4. Staging
  5. Stage Coingecko Token List Mapping Table
  6. Stage Coingecko hourly crypto currency statistical table
  7. Stage snscrape tweets crypto twitter table
  8. Load Dimensions
  9. Load Coingecko Token List Mapping Table
  10. Load Date Dim with date information from Coingecko hourly crypto currency statistical staging table
  11. Load Date Dim with date information from Stage snscrape tweets crypto twitter staging table
  12. Create Fact Tables
  13. Load Fact Tables
  14. Load crypto currency statistics history table
  15. Load snscrape tweets history table
  16. Run Data Quality Checks
  17. Select Statements that make sure data is actually present
  18. Build an Aggregate table with min statistic and max statistic values per month from the coin_stats_hist table
  19. Store resulting dim, fact and aggregate tables in S3
  20. Delete Redshift Cluster

Future Work and Final Thoughts

Some questions for future work:

  • What if the data was increased by 100x.
    • I would use a spark emr cluster to process the data as that would speed up both the data ingestion and the processing parts of the project.
    • This is likely going to happen in my future steps for this project, so ultimately this will be added in future versions.
  • What if the pipelines would be run on a daily basis by 7 am every day.
    • I need a way to get the first part of this process easier. The issue is sometimes either the coingecko or the snscrape tweets api breaks. Thus, if this pipeline would need to be run every day at 7am I would need to fix the initial data ingestion into my S3 bucket, as in, making the process more automated.
    • Nonetheless, if we are just referring to the S3-->Redshift-->S3 part of the process, then I would set airflow to run the current elt process daily as the initial api --> MongoDB --> S3 part of the process would be taken care of.
    • I would also need to add in an extra step so that the pipeline combines the data that is previously stored in the S3 bucket with the new data added.
  • What if the database needed to be accessed by 100+ people.
    • If the database needs to be accessed by 100+ people than I would need to either:
      • constantly run a redshift cluster with the tables stored in said cluster (this requires additional IAM configuration and security protocols)
      • store the results in MongoDB so everyone can just pull from that database using pandas (requires adding everyones IP to the MongoDB Network)
      • have users simply pull from the mkgpublic S3 Bucket (just need the S3 URI) and using a platform like Databricks for users to run analysis

Future Work

Ultimately, I want to use these datasets as the backend to a dashboard hosted on a website.

I want to incoporate reddit data as well into the mix. Afterwards, I want to run sentiment analysis on both the tweets and reddit thread datasets to determine the current crypto market sentiment.

Work will be done over the next few months on the above tasks.

Owner
Matthew Greene
Backend Engineer
Matthew Greene
DIY gravity falls cryptograms made with python

ciphers-cryptograms some diy code to implementing ciphers-cryptograms from gravity falls with python, it's fun tho Algorithm or ciphers list Caesar At

Muhammad Asthi Seta Ari Yuwana 3 Jun 26, 2022
Deriving RSA public keys from message-signature pairs

The repository contains: Experimental code to calculate RSA public keys based on two known message-signature pairs

Silent Signal 120 Dec 31, 2022
Certifi: Python SSL Certificates

(Python Distribution) A carefully curated collection of Root Certificates for validating the trustworthiness of SSL certificates while verifying the identity of TLS hosts.

Certifi 608 Jan 02, 2023
Bridge between L1 (Ethereum) and L2 (cheapETH)

The ETH chain and the cheapETH chain. We can assume the ETH chain has ~1000x more value than the cheapETH chain.

107 Oct 12, 2022
Random Pasword Generator Sezar Crypto

Random_Pasword_Generator_Sezar_Crypto Simple Work Main design available in ana_sayfa.ui / ana_sayfa2.py Popup design available in popup.ui / anahtarp

Ahmet Gündoğdu - DRAGO 2 Dec 19, 2021
Smart-contracts - open sourcing our upcoming smart contracts for better security and transparency

Smart-contracts - open sourcing our upcoming smart contracts for better security and transparency

Rand Gallery 16 Jul 10, 2022
SysWhispers integrated shellcode loader w/ ETW patching & anti-sandboxing

TymSpecial Shellcode Loader Description This project was made as a way for myself to learn C++ and gain insight into how EDR products work. TymSpecial

Nick Frischkorn 145 Dec 20, 2022
Taishang Credential With Interactive Badges

结合数字徽章的交互式区块链证书 DApp 1 项目简介 DID 与 VC 一直是区块链研究的重要领域,也是区块链落地的重要基础,从「传统证书」到基于DID的VC证书是证书体系范式转移的重要第一步。 但是,在迈出第一步之后我们可以进行更加丰富的尝试,例如尝试将不可转移的徽章与可转移的权益与证书相结合,

1 Nov 07, 2021
Marketplace but with cryptocurrencies only.

MoneroMarket Marketplace but with cryptocurrencies only. MoneroMarket was created as a way to be able to use cryptocurrencies as an actual currency to

Janoher 35 Jan 01, 2023
An extreme encryption for everyone, encrypt your text before sending to anyone.

An extreme encryption for everyone, encrypt your text before sending to anyone. Alphabets and numbers are going to be encrypted like a hell

Saad 6 Oct 28, 2022
Privfiles - Encrypted file storage using Fernet with zero Javascript

Privfiles - Encrypted file storage using Fernet with zero Javascript Source code for the onion service: l3n6v6dm63frml22tlmzacnasvp7co7wylu4hhcs34ukxe

Ward 5 Jul 30, 2022
A little side-project API for me to learn about Blockchain and Tokens

BlockChain API I built this little side project to learn more about Blockchain and Tokens. It might be maintained and implemented to other projects bu

Loïk Mallat 1 Nov 16, 2021
How to setup a multi-client ethereum Eth1-Eth2 merge testnet

Mergenet tutorial Let's set up a local eth1-eth2 merge testnet! Preparing the setup environment In this tutorial, we use a series of scripts to genera

Diederik Loerakker 24 Jun 17, 2022
🔑 Password manager and password generator

Password-Manager Create Account Quick Login Generate Password Save Password Offline App Passwords are stored on your system and no one has access to t

Abbas Ataei 41 Nov 09, 2022
Linear encryption software programmed with python

Echoder linear encryption software programmed with python How does it work? The text in the text section runs a function with two keys entered keys mu

Emre Orhan 4 Dec 20, 2021
Bitcoin & Lightning Container Manager for facilitating development tools

Torch-cli Bitcoin & Lightning Container Manager for facilitating development too

Gray Finance 3 Aug 22, 2022
A simple Ethereum mining pool

A simple getWork pool for ethereum mining Payouts are still manual. TODO: write payouts when someone mines 10 blocks. Also, make the submit actually

93 Oct 05, 2022
The (Python-based) mining software required for the Game Boy mining project.

The (Python-based) mining software required for the Game Boy mining project.

Ghidra Ninja 31 Nov 04, 2022
The leading native Python SSHv2 protocol library.

Paramiko Paramiko: Python SSH module Copyright: Copyright (c) 2009 Robey Pointer 8.1k Jan 08, 2023

Dicoding Machine Learning for Expert Submission 1 - Predictive Analytics

Laporan Proyek Machine Learning - Azhar Rizki Zulma Domain Proyek Domain proyek yang dipilih dalam proyek machine learning ini adalah mengenai keuanga

Azhar Rizki Zulma 6 Jul 23, 2022