This repository contains a streaming Dataflow pipeline written in Python with Apache Beam, reading data from PubSub.

Overview

Sample streaming Dataflow pipeline written in Python

This repository contains a streaming Dataflow pipeline written in Python with Apache Beam, reading data from PubSub.

For more details, see the following Beam Summit 2021 talk:

To run this pipeline, you need to have the SDK installed, and a project in Google Cloud Platform, even if you run the pipeline locally with the direct runner:

Description of the pipeline

Data input

We are using here a public PubSub topic with data, so we don't need to setup our own to run this pipeline.

The topic is projects/pubsub-public-data/topics/taxirides-realtime.

That topic contains messages from the NYC Taxi Ride dataset. Here is a sample of the data contained in a message in that topic:

{
  "ride_id": "328bec4b-0126-42d4-9381-cb1dbf0e2432",
  "point_idx": 305,
  "latitude": 40.776270000000004,
  "longitude": -73.99111,
  "timestamp": "2020-03-27T21:32:51.48098-04:00",
  "meter_reading": 9.403651,
  "meter_increment": 0.030831642,
  "ride_status": "enroute",
  "passenger_count": 1
}

But the messages also contain metadata, that is useful for streaming pipelines. In this case, the messages contain an attribute of name ts, which contains the same timestamp as the field of name timestamp in the data. Remember that PubSub treats the data as just a string of bytes, so it does not know anything about the data itself. The metadata fields are normally used to publish messages with specific ids and/or timestamps.

To inspect the messages from this topic, you can create a subscription, and then pull some messages.

To create a subscription, use the gcloud cli utility (installed by default in the Cloud Shell):

export TOPIC=projects/pubsub-public-data/topics/taxirides-realtime
gcloud pubsub subscriptions create taxis --topic $TOPIC

To pull messages:

gcloud pubsub subscriptions pull taxis --limit 3

or if you have jq (for pretty printing of JSON)

gcloud pubsub subscriptions pull taxis --limit 3 | grep " {" | cut -f 2 -d ' ' | jq

Pay special attention to the Attributes column (metadata). You will see that the timestamp included as a field in the metadata, as well as in the data. We will leverage that metadata field for the timestamps used in our streaming pipeline.

Data output

This pipeline writes the output to BigQuery, in streaming append-only mode.

The destination tables must exist prior to running the pipeline.

If you have the GCloud cli utility installed (for instance, it is installed by default in the Cloud Shell), you can create the tables from the command line.

You need to create a BigQuery dataset too, in the same region:

After that, you can create the destination tables with the provided script

./scripts/create_tables.sh taxi_rides

Algorithm / business rules

We are using a session window with a gap of 10 seconds. That means that all the messages with the same ride_id will be grouped together, as long as their timestamps are 10 seconds within each other. Any message with a timestamp more than 10 seconds apart will be discarded (for old timestamps) or will open a new window (for newer timestamps).

With the messages inside each window (that is, each different ride_id will be part of a different window), we will calculate the duration of the session, as the difference between the min and max timestamps in the window. We will also calculate the number of events in that session.

We will use a GroupByKey to operate with all the messages in a window. This will load all the messages in the window into memory. This is fine, as in Beam streaming, a window is always processed in a worker (windows cannot be split across different workers).

This is an example of the kind of logic that can be implemented leveraging windows in streaming pipelines. This grouping of messages across ride_id and event timestamps is automatically done by the pipeline, and we just need to express the generic operations to be performed with each window, as part of our pipeline.

Running the pipeline

Prerequirements

You need to have a Google Cloud project, and the gcloud SDK configured to run the pipeline. For instance, you could run it from the Cloud Shell in Google Cloud Platform (gcloud would be automatically configured).

Then you need to create a Google Cloud Storage bucket, with the same name as your project id, and in the same region where you will run Dataflow:

Make sure that you have a Python environment with Python 3 (<3.9). For instance a virtualenv, and install apache-beam[gcp] and python-dateutil in your local environment. For instance, assuming that you are running in a virtualenv:

pip install "apache-beam[gcp]" python-dateutil

Run the pipeline

Once the tables are created and the dependencies installed, edit scripts/launch_dataflow_runner.sh and set your project id and region, and then run it with:

./scripts/launch_dataflow_runner.sh

The outputs will be written to the BigQuery tables, and in the profile directory in your bucket you should see Python gprof files with profiling information.

CPU profiling

Beam uses the Python profiler to produce files in Python gprof format. You will need some scripting to interpret those files and extracts insights out of them.

In this repository, you will find some sample output in data/beam.prof, that you can use to check what the profiling output looks like. Use the following Colab notebook with an example analyzing that sample profiling data:

Refer to this post for more details about how to interpret that file:

License

Copyright 2021 Israel Herraiz

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

   http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Owner
Israel Herraiz
Strategic Cloud Engineer @GoogleCloudPlatform
Israel Herraiz
An application that allows you to design and test your own stock trading algorithms in an attempt to beat the market.

StockBot is a Python application for designing and testing your own daily stock trading algorithms. Installation Use the

Ryan Cullen 280 Dec 19, 2022
Custom Plotly Dash components based on Mantine React Components library

Dash Mantine Components Dash Mantine Components is a Dash component library based on Mantine React Components Library. It makes it easier to create go

Snehil Vijay 239 Jan 08, 2023
Streamlit dashboard examples - Twitter cashtags, StockTwits, WSB, Charts, SQL Pattern Scanner

streamlit-dashboards Streamlit dashboard examples - Twitter cashtags, StockTwits, WSB, Charts, SQL Pattern Scanner Tutorial Video https://ww

122 Dec 21, 2022
A visualization tool made in Pygame for various pathfinding algorithms.

Pathfinding-Visualizer 🚀 A visualization tool made in Pygame for various pathfinding algorithms. Pathfinding is closely related to the shortest path

Aysha sana 7 Jul 09, 2022
A programming language built on top of Python to easily allow Swahili speakers to get started with programming without ever knowing English

pyswahili A programming language built over Python to easily allow swahili speakers to get started with programming without ever knowing english pyswa

Jordan Kalebu 72 Dec 15, 2022
A Graph Learning library for Humans

A Graph Learning library for Humans These novel algorithms include but are not limited to: A graph construction and graph searching class can be found

Richard Tjörnhammar 1 Feb 08, 2022
The repository is my code for various types of data visualization cases based on the Matplotlib library.

ScienceGallery The repository is my code for various types of data visualization cases based on the Matplotlib library. It summarizes the code and cas

Warrick Xu 2 Apr 20, 2022
Simple Inkscape Scripting

Simple Inkscape Scripting Description In the Inkscape vector-drawing program, how would you go about drawing 100 diamonds, each with a random color an

Scott Pakin 140 Dec 27, 2022
Small U-Net for vehicle detection

Small U-Net for vehicle detection Vivek Yadav, PhD Overview In this repository , we will go over using U-net for detecting vehicles in a video stream

Vivek Yadav 91 Nov 03, 2022
nvitop, an interactive NVIDIA-GPU process viewer, the one-stop solution for GPU process management

An interactive NVIDIA-GPU process viewer, the one-stop solution for GPU process management.

Xuehai Pan 1.3k Jan 02, 2023
Handout for the tutorial "Creating publication-quality figures with matplotlib"

Handout for the tutorial "Creating publication-quality figures with matplotlib"

JB Mouret 1.9k Jan 02, 2023
a plottling library for python, based on D3

Hello August 2013 Hello! Maybe you're looking for a nice Python interface to build interactive, javascript based plots that look as nice as all those

Mike Dewar 1.4k Dec 28, 2022
finds grocery stores and stuff next to route (gpx)

Route-Report Route report is a command-line utility that can be used to locate points-of-interest near your planned route (gpx). The results are based

Clemens Mosig 5 Oct 10, 2022
Profile and test to gain insights into the performance of your beautiful Python code

Profile and test to gain insights into the performance of your beautiful Python code View Demo - Report Bug - Request Feature QuickPotato in a nutshel

Joey Hendricks 138 Dec 06, 2022
Generating interfaces(CLI, Qt GUI, Dash web app) from a Python function.

oneFace is a Python library for automatically generating multiple interfaces(CLI, GUI, WebGUI) from a callable Python object. oneFace is an easy way t

NaNg 31 Oct 21, 2022
CONTRIBUTIONS ONLY: Voluptuous, despite the name, is a Python data validation library.

CONTRIBUTIONS ONLY What does this mean? I do not have time to fix issues myself. The only way fixes or new features will be added is by people submitt

Alec Thomas 1.8k Dec 31, 2022
patchwork for matplotlib

patchworklib patchwork for matplotlib test code Preparation of example plots import seaborn as sns import numpy as np import pandas as pd #Bri

Mori Hideto 185 Jan 06, 2023
metedraw is a project mainly for data visualization projects of Atmospheric Science, Marine Science, Environmental Science or other majors

It is mainly for data visualization projects of Atmospheric Science, Marine Science, Environmental Science or other majors.

Nephele 11 Jul 05, 2022
Manim is an animation engine for explanatory math videos.

A community-maintained Python framework for creating mathematical animations.

12.4k Dec 30, 2022
A streamlit component for bi-directional communication with bokeh plots.

Streamlit Bokeh Events A streamlit component for bi-directional communication with bokeh plots. Its just a workaround till streamlit team releases sup

Ashish Shukla 123 Dec 25, 2022