This repository contains a streaming Dataflow pipeline written in Python with Apache Beam, reading data from PubSub.

Overview

Sample streaming Dataflow pipeline written in Python

This repository contains a streaming Dataflow pipeline written in Python with Apache Beam, reading data from PubSub.

For more details, see the following Beam Summit 2021 talk:

To run this pipeline, you need to have the SDK installed, and a project in Google Cloud Platform, even if you run the pipeline locally with the direct runner:

Description of the pipeline

Data input

We are using here a public PubSub topic with data, so we don't need to setup our own to run this pipeline.

The topic is projects/pubsub-public-data/topics/taxirides-realtime.

That topic contains messages from the NYC Taxi Ride dataset. Here is a sample of the data contained in a message in that topic:

{
  "ride_id": "328bec4b-0126-42d4-9381-cb1dbf0e2432",
  "point_idx": 305,
  "latitude": 40.776270000000004,
  "longitude": -73.99111,
  "timestamp": "2020-03-27T21:32:51.48098-04:00",
  "meter_reading": 9.403651,
  "meter_increment": 0.030831642,
  "ride_status": "enroute",
  "passenger_count": 1
}

But the messages also contain metadata, that is useful for streaming pipelines. In this case, the messages contain an attribute of name ts, which contains the same timestamp as the field of name timestamp in the data. Remember that PubSub treats the data as just a string of bytes, so it does not know anything about the data itself. The metadata fields are normally used to publish messages with specific ids and/or timestamps.

To inspect the messages from this topic, you can create a subscription, and then pull some messages.

To create a subscription, use the gcloud cli utility (installed by default in the Cloud Shell):

export TOPIC=projects/pubsub-public-data/topics/taxirides-realtime
gcloud pubsub subscriptions create taxis --topic $TOPIC

To pull messages:

gcloud pubsub subscriptions pull taxis --limit 3

or if you have jq (for pretty printing of JSON)

gcloud pubsub subscriptions pull taxis --limit 3 | grep " {" | cut -f 2 -d ' ' | jq

Pay special attention to the Attributes column (metadata). You will see that the timestamp included as a field in the metadata, as well as in the data. We will leverage that metadata field for the timestamps used in our streaming pipeline.

Data output

This pipeline writes the output to BigQuery, in streaming append-only mode.

The destination tables must exist prior to running the pipeline.

If you have the GCloud cli utility installed (for instance, it is installed by default in the Cloud Shell), you can create the tables from the command line.

You need to create a BigQuery dataset too, in the same region:

After that, you can create the destination tables with the provided script

./scripts/create_tables.sh taxi_rides

Algorithm / business rules

We are using a session window with a gap of 10 seconds. That means that all the messages with the same ride_id will be grouped together, as long as their timestamps are 10 seconds within each other. Any message with a timestamp more than 10 seconds apart will be discarded (for old timestamps) or will open a new window (for newer timestamps).

With the messages inside each window (that is, each different ride_id will be part of a different window), we will calculate the duration of the session, as the difference between the min and max timestamps in the window. We will also calculate the number of events in that session.

We will use a GroupByKey to operate with all the messages in a window. This will load all the messages in the window into memory. This is fine, as in Beam streaming, a window is always processed in a worker (windows cannot be split across different workers).

This is an example of the kind of logic that can be implemented leveraging windows in streaming pipelines. This grouping of messages across ride_id and event timestamps is automatically done by the pipeline, and we just need to express the generic operations to be performed with each window, as part of our pipeline.

Running the pipeline

Prerequirements

You need to have a Google Cloud project, and the gcloud SDK configured to run the pipeline. For instance, you could run it from the Cloud Shell in Google Cloud Platform (gcloud would be automatically configured).

Then you need to create a Google Cloud Storage bucket, with the same name as your project id, and in the same region where you will run Dataflow:

Make sure that you have a Python environment with Python 3 (<3.9). For instance a virtualenv, and install apache-beam[gcp] and python-dateutil in your local environment. For instance, assuming that you are running in a virtualenv:

pip install "apache-beam[gcp]" python-dateutil

Run the pipeline

Once the tables are created and the dependencies installed, edit scripts/launch_dataflow_runner.sh and set your project id and region, and then run it with:

./scripts/launch_dataflow_runner.sh

The outputs will be written to the BigQuery tables, and in the profile directory in your bucket you should see Python gprof files with profiling information.

CPU profiling

Beam uses the Python profiler to produce files in Python gprof format. You will need some scripting to interpret those files and extracts insights out of them.

In this repository, you will find some sample output in data/beam.prof, that you can use to check what the profiling output looks like. Use the following Colab notebook with an example analyzing that sample profiling data:

Refer to this post for more details about how to interpret that file:

License

Copyright 2021 Israel Herraiz

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

   http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Owner
Israel Herraiz
Strategic Cloud Engineer @GoogleCloudPlatform
Israel Herraiz
Uniform Manifold Approximation and Projection

UMAP Uniform Manifold Approximation and Projection (UMAP) is a dimension reduction technique that can be used for visualisation similarly to t-SNE, bu

Leland McInnes 6k Jan 08, 2023
Regress.me is an easy to use data visualization tool powered by Dash/Plotly.

Regress.me Regress.me is an easy to use data visualization tool powered by Dash/Plotly. Regress.me.-.Google.Chrome.2022-05-10.15-58-59.mp4 Get Started

Amar 14 Aug 14, 2022
Backend app for visualizing CANedge log files in Grafana (directly from local disk or S3)

CANedge Grafana Backend - Visualize CAN/LIN Data in Dashboards This project enables easy dashboard visualization of log files from the CANedge CAN/LIN

13 Dec 15, 2022
a python function to plot a geopandas dataframe

Pretty GeoDataFrame A minimum python function (~60 lines) to draw pretty geodataframe. Based on matplotlib, shapely, descartes. Installation just use

haoming 27 Dec 05, 2022
The official colors of the FAU as matplotlib/seaborn colormaps

FAU - Colors The official colors of Friedrich-Alexander-Universität Erlangen-Nürnberg (FAU) as matplotlib / seaborn colormaps. We support the old colo

Machine Learning and Data Analytics Lab FAU 9 Sep 05, 2022
Resources for teaching & learning practical data visualization with python.

Practical Data Visualization with Python Overview All views expressed on this site are my own and do not represent the opinions of any entity with whi

Paul Jeffries 98 Sep 24, 2022
Visualization Website by using Dash and Heroku

Visualization Website by using Dash and Heroku You can visit the website https://payroll-expense-analysis.herokuapp.com/ In this project, I am interes

YF Liu 1 Jan 14, 2022
Cartopy - a cartographic python library with matplotlib support

Cartopy is a Python package designed to make drawing maps for data analysis and visualisation easy. Table of contents Overview Get in touch License an

1.2k Jan 01, 2023
Create Badges with stats of Scratch User, Project and Studio. Use those badges in Github readmes, etc.

Scratch-Stats-Badge Create customized Badges with stats of Scratch User, Studio or Project. Use those badges in Github readmes, etc. Examples Document

Siddhesh Chavan 5 Aug 28, 2022
Simple Python interface for Graphviz

Simple Python interface for Graphviz

Sebastian Bank 1.3k Dec 26, 2022
PanGraphViewer -- show panenome graph in an easy way

PanGraphViewer -- show panenome graph in an easy way Table of Contents Versions and dependences Desktop-based panGraphViewer Library installation for

16 Dec 17, 2022
A guide for using Bootstrap 5 classes in Dash Bootstrap Components V1

dash-bootstrap-cheatsheet This handy interactive cheatsheet makes it easy to use the Bootstrap 5 classes with your Dash app made with the latest versi

10 Dec 22, 2022
CLAHE Contrast Limited Adaptive Histogram Equalization

A simple code to process images using contrast limited adaptive histogram equalization. Image processing is becoming a major part of data processig.

Happy N. Monday 4 May 18, 2022
Create a table with row explanations, column headers, using matplotlib

Create a table with row explanations, column headers, using matplotlib. Intended usage was a small table containing a custom heatmap.

4 Aug 14, 2022
又一个云探针

ServerStatus-Murasame 感谢ServerStatus-Hotaru,又一个云探针诞生了(大雾 本项目在ServerStatus-Hotaru的基础上使用fastapi重构了服务端,部分修改了客户端与前端 项目还在非常原始的阶段,可能存在严重的问题 演示站:https://stat

6 Oct 19, 2021
Generate the report for OCULTest.

Sample report generated in this function Usage example from utils.gen_report import generate_report if __name__ == '__main__': # def generate_rep

Philip Guo 1 Mar 10, 2022
A gui application to visualize various sorting algorithms using pure python.

Sorting Algorithm Visualizer A gui application to visualize various sorting algorithms using pure python. Language : Python 3 Libraries required Tkint

Rajarshi Banerjee 19 Nov 30, 2022
This package creates clean and beautiful matplotlib plots that work on light and dark backgrounds

This package creates clean and beautiful matplotlib plots that work on light and dark backgrounds. Inspired by the work of Edward Tufte.

Nico Schlömer 205 Jan 07, 2023
阴阳师后台全平台(使用网易 MuMu 模拟器)辅助。支持御魂,觉醒,御灵,结界突破,秘闻副本,地域鬼王。

阴阳师后台全平台辅助 Python 版本:Python 3.8.3 模拟器:网易 MuMu | 雷电模拟器 模拟器分辨率:1024*576 显卡渲染模式:兼容(OpenGL) 兼容 Windows 系统和 MacOS 系统 思路: 利用 adb 截图后,使用 opencv 找图找色,模拟点击。使用

简讯 27 Jul 09, 2022
YOPO is an interactive dashboard which generates various standard plots.

YOPO is an interactive dashboard which generates various standard plots.you can create various graphs and charts with a click of a button. This tool uses Dash and Flask in backend.

ADARSH C 38 Dec 20, 2022