This repository contains a streaming Dataflow pipeline written in Python with Apache Beam, reading data from PubSub.

Overview

Sample streaming Dataflow pipeline written in Python

This repository contains a streaming Dataflow pipeline written in Python with Apache Beam, reading data from PubSub.

For more details, see the following Beam Summit 2021 talk:

To run this pipeline, you need to have the SDK installed, and a project in Google Cloud Platform, even if you run the pipeline locally with the direct runner:

Description of the pipeline

Data input

We are using here a public PubSub topic with data, so we don't need to setup our own to run this pipeline.

The topic is projects/pubsub-public-data/topics/taxirides-realtime.

That topic contains messages from the NYC Taxi Ride dataset. Here is a sample of the data contained in a message in that topic:

{
  "ride_id": "328bec4b-0126-42d4-9381-cb1dbf0e2432",
  "point_idx": 305,
  "latitude": 40.776270000000004,
  "longitude": -73.99111,
  "timestamp": "2020-03-27T21:32:51.48098-04:00",
  "meter_reading": 9.403651,
  "meter_increment": 0.030831642,
  "ride_status": "enroute",
  "passenger_count": 1
}

But the messages also contain metadata, that is useful for streaming pipelines. In this case, the messages contain an attribute of name ts, which contains the same timestamp as the field of name timestamp in the data. Remember that PubSub treats the data as just a string of bytes, so it does not know anything about the data itself. The metadata fields are normally used to publish messages with specific ids and/or timestamps.

To inspect the messages from this topic, you can create a subscription, and then pull some messages.

To create a subscription, use the gcloud cli utility (installed by default in the Cloud Shell):

export TOPIC=projects/pubsub-public-data/topics/taxirides-realtime
gcloud pubsub subscriptions create taxis --topic $TOPIC

To pull messages:

gcloud pubsub subscriptions pull taxis --limit 3

or if you have jq (for pretty printing of JSON)

gcloud pubsub subscriptions pull taxis --limit 3 | grep " {" | cut -f 2 -d ' ' | jq

Pay special attention to the Attributes column (metadata). You will see that the timestamp included as a field in the metadata, as well as in the data. We will leverage that metadata field for the timestamps used in our streaming pipeline.

Data output

This pipeline writes the output to BigQuery, in streaming append-only mode.

The destination tables must exist prior to running the pipeline.

If you have the GCloud cli utility installed (for instance, it is installed by default in the Cloud Shell), you can create the tables from the command line.

You need to create a BigQuery dataset too, in the same region:

After that, you can create the destination tables with the provided script

./scripts/create_tables.sh taxi_rides

Algorithm / business rules

We are using a session window with a gap of 10 seconds. That means that all the messages with the same ride_id will be grouped together, as long as their timestamps are 10 seconds within each other. Any message with a timestamp more than 10 seconds apart will be discarded (for old timestamps) or will open a new window (for newer timestamps).

With the messages inside each window (that is, each different ride_id will be part of a different window), we will calculate the duration of the session, as the difference between the min and max timestamps in the window. We will also calculate the number of events in that session.

We will use a GroupByKey to operate with all the messages in a window. This will load all the messages in the window into memory. This is fine, as in Beam streaming, a window is always processed in a worker (windows cannot be split across different workers).

This is an example of the kind of logic that can be implemented leveraging windows in streaming pipelines. This grouping of messages across ride_id and event timestamps is automatically done by the pipeline, and we just need to express the generic operations to be performed with each window, as part of our pipeline.

Running the pipeline

Prerequirements

You need to have a Google Cloud project, and the gcloud SDK configured to run the pipeline. For instance, you could run it from the Cloud Shell in Google Cloud Platform (gcloud would be automatically configured).

Then you need to create a Google Cloud Storage bucket, with the same name as your project id, and in the same region where you will run Dataflow:

Make sure that you have a Python environment with Python 3 (<3.9). For instance a virtualenv, and install apache-beam[gcp] and python-dateutil in your local environment. For instance, assuming that you are running in a virtualenv:

pip install "apache-beam[gcp]" python-dateutil

Run the pipeline

Once the tables are created and the dependencies installed, edit scripts/launch_dataflow_runner.sh and set your project id and region, and then run it with:

./scripts/launch_dataflow_runner.sh

The outputs will be written to the BigQuery tables, and in the profile directory in your bucket you should see Python gprof files with profiling information.

CPU profiling

Beam uses the Python profiler to produce files in Python gprof format. You will need some scripting to interpret those files and extracts insights out of them.

In this repository, you will find some sample output in data/beam.prof, that you can use to check what the profiling output looks like. Use the following Colab notebook with an example analyzing that sample profiling data:

Refer to this post for more details about how to interpret that file:

License

Copyright 2021 Israel Herraiz

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

   http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Owner
Israel Herraiz
Strategic Cloud Engineer @GoogleCloudPlatform
Israel Herraiz
Browse Dash docsets inside emacs

Helm Dash What's it This package uses Dash docsets inside emacs to browse documentation. Here's an article explaining the basic usage of it. It doesn'

504 Dec 15, 2022
LabGraph is a a Python-first framework used to build sophisticated research systems with real-time streaming, graph API, and parallelism.

LabGraph is a a Python-first framework used to build sophisticated research systems with real-time streaming, graph API, and parallelism.

MLH Fellowship 7 Oct 05, 2022
clock_plot provides a simple way to visualize timeseries data, mapping 24 hours onto the 360 degrees of a polar plot

clock_plot clock_plot provides a simple way to visualize timeseries data mapping 24 hours onto the 360 degrees of a polar plot. For usage, please see

12 Aug 24, 2022
Example scripts for generating plots of Bohemian matrices

Bohemian Eigenvalue Plotting Examples This repository contains examples of generating plots of Bohemian eigenvalues. The examples in this repository a

Bohemian Matrices 5 Nov 12, 2022
Visualize your pandas data with one-line code

PandasEcharts 简介 基于pandas和pyecharts的可视化工具 安装 pip 安装 $ pip install pandasecharts 源码安装 $ git clone https://github.com/gamersover/pandasecharts $ cd pand

陈华杰 2 Apr 13, 2022
Draw tree diagrams from indented text input

Draw tree diagrams This repository contains two very different scripts to produce hierarchical tree diagrams like this one: $ ./classtree.py collectio

Luciano Ramalho 8 Dec 14, 2022
📊 Charts with pure python

A zero-dependency python package that prints basic charts to a Jupyter output Charts supported: Bar graphs Scatter plots Histograms 🍑 📊 👏 Examples

Max Humber 54 Oct 04, 2022
Keir&'s Visualizing Data on Life Expectancy

Keir's Visualizing Data on Life Expectancy Below is information on life expectancy in the United States from 1900-2017. You will also find information

9 Jun 06, 2022
A deceptively simple plotting library for Streamlit

🍅 Plost A deceptively simple plotting library for Streamlit. Because you've been writing plots wrong all this time. Getting started pip install plost

Thiago Teixeira 192 Dec 29, 2022
Streamlit component for Let's-Plot visualization library

streamlit-letsplot This is a work-in-progress, providing a convenience function to plot charts from the Lets-Plot visualization library. Example usage

Randy Zwitch 9 Nov 03, 2022
Backend app for visualizing CANedge log files in Grafana (directly from local disk or S3)

CANedge Grafana Backend - Visualize CAN/LIN Data in Dashboards This project enables easy dashboard visualization of log files from the CANedge CAN/LIN

13 Dec 15, 2022
Python Data Validation for Humans™.

validators Python data validation for Humans. Python has all kinds of data validation tools, but every one of them seems to require defining a schema

Konsta Vesterinen 670 Jan 09, 2023
Attractors is a package for simulation and visualization of strange attractors.

attractors Attractors is a package for simulation and visualization of strange attractors. Installation The simplest way to install the module is via

Vignesh M 45 Jul 31, 2022
Flame Graphs visualize profiled code

Flame Graphs visualize profiled code

Brendan Gregg 14.1k Jan 03, 2023
Info for The Great DataTas plot-a-thon

The Great DataTas plot-a-thon Datatas is organising a Data Visualisation competition: The Great DataTas plot-a-thon We will be using Tidy Tuesday data

2 Nov 21, 2021
Set of matplotlib operations that are not trivial

Matplotlib Snippets This repository contains a set of matplotlib operations that are not trivial. Histograms Histogram with bins adapted to log scale

Raphael Meudec 1 Nov 15, 2021
A python package for animating plots build on matplotlib.

animatplot A python package for making interactive as well as animated plots with matplotlib. Requires Python = 3.5 Matplotlib = 2.2 (because slider

Tyler Makaro 394 Dec 18, 2022
Voilà, install macOS on ANY Computer! This is really and magic easiest way!

OSX-PROXMOX - Run macOS on ANY Computer - AMD & Intel Install Proxmox VE v7.02 - Next, Next & Finish (NNF). Open Proxmox Web Console - Datacenter N

Gabriel Luchina 654 Jan 09, 2023
Productivity Tools for Plotly + Pandas

Cufflinks This library binds the power of plotly with the flexibility of pandas for easy plotting. This library is available on https://github.com/san

Jorge Santos 2.7k Dec 30, 2022
Realtime Viewer Mandelbrot set with Python and Taichi (cpu, opengl, cuda, vulkan, metal)

Mandelbrot-set-Realtime-Viewer- Realtime Viewer Mandelbrot set with Python and Taichi (cpu, opengl, cuda, vulkan, metal) Control: "WASD" - movement, "

22 Oct 31, 2022