This repository contains a streaming Dataflow pipeline written in Python with Apache Beam, reading data from PubSub.

Overview

Sample streaming Dataflow pipeline written in Python

This repository contains a streaming Dataflow pipeline written in Python with Apache Beam, reading data from PubSub.

For more details, see the following Beam Summit 2021 talk:

To run this pipeline, you need to have the SDK installed, and a project in Google Cloud Platform, even if you run the pipeline locally with the direct runner:

Description of the pipeline

Data input

We are using here a public PubSub topic with data, so we don't need to setup our own to run this pipeline.

The topic is projects/pubsub-public-data/topics/taxirides-realtime.

That topic contains messages from the NYC Taxi Ride dataset. Here is a sample of the data contained in a message in that topic:

{
  "ride_id": "328bec4b-0126-42d4-9381-cb1dbf0e2432",
  "point_idx": 305,
  "latitude": 40.776270000000004,
  "longitude": -73.99111,
  "timestamp": "2020-03-27T21:32:51.48098-04:00",
  "meter_reading": 9.403651,
  "meter_increment": 0.030831642,
  "ride_status": "enroute",
  "passenger_count": 1
}

But the messages also contain metadata, that is useful for streaming pipelines. In this case, the messages contain an attribute of name ts, which contains the same timestamp as the field of name timestamp in the data. Remember that PubSub treats the data as just a string of bytes, so it does not know anything about the data itself. The metadata fields are normally used to publish messages with specific ids and/or timestamps.

To inspect the messages from this topic, you can create a subscription, and then pull some messages.

To create a subscription, use the gcloud cli utility (installed by default in the Cloud Shell):

export TOPIC=projects/pubsub-public-data/topics/taxirides-realtime
gcloud pubsub subscriptions create taxis --topic $TOPIC

To pull messages:

gcloud pubsub subscriptions pull taxis --limit 3

or if you have jq (for pretty printing of JSON)

gcloud pubsub subscriptions pull taxis --limit 3 | grep " {" | cut -f 2 -d ' ' | jq

Pay special attention to the Attributes column (metadata). You will see that the timestamp included as a field in the metadata, as well as in the data. We will leverage that metadata field for the timestamps used in our streaming pipeline.

Data output

This pipeline writes the output to BigQuery, in streaming append-only mode.

The destination tables must exist prior to running the pipeline.

If you have the GCloud cli utility installed (for instance, it is installed by default in the Cloud Shell), you can create the tables from the command line.

You need to create a BigQuery dataset too, in the same region:

After that, you can create the destination tables with the provided script

./scripts/create_tables.sh taxi_rides

Algorithm / business rules

We are using a session window with a gap of 10 seconds. That means that all the messages with the same ride_id will be grouped together, as long as their timestamps are 10 seconds within each other. Any message with a timestamp more than 10 seconds apart will be discarded (for old timestamps) or will open a new window (for newer timestamps).

With the messages inside each window (that is, each different ride_id will be part of a different window), we will calculate the duration of the session, as the difference between the min and max timestamps in the window. We will also calculate the number of events in that session.

We will use a GroupByKey to operate with all the messages in a window. This will load all the messages in the window into memory. This is fine, as in Beam streaming, a window is always processed in a worker (windows cannot be split across different workers).

This is an example of the kind of logic that can be implemented leveraging windows in streaming pipelines. This grouping of messages across ride_id and event timestamps is automatically done by the pipeline, and we just need to express the generic operations to be performed with each window, as part of our pipeline.

Running the pipeline

Prerequirements

You need to have a Google Cloud project, and the gcloud SDK configured to run the pipeline. For instance, you could run it from the Cloud Shell in Google Cloud Platform (gcloud would be automatically configured).

Then you need to create a Google Cloud Storage bucket, with the same name as your project id, and in the same region where you will run Dataflow:

Make sure that you have a Python environment with Python 3 (<3.9). For instance a virtualenv, and install apache-beam[gcp] and python-dateutil in your local environment. For instance, assuming that you are running in a virtualenv:

pip install "apache-beam[gcp]" python-dateutil

Run the pipeline

Once the tables are created and the dependencies installed, edit scripts/launch_dataflow_runner.sh and set your project id and region, and then run it with:

./scripts/launch_dataflow_runner.sh

The outputs will be written to the BigQuery tables, and in the profile directory in your bucket you should see Python gprof files with profiling information.

CPU profiling

Beam uses the Python profiler to produce files in Python gprof format. You will need some scripting to interpret those files and extracts insights out of them.

In this repository, you will find some sample output in data/beam.prof, that you can use to check what the profiling output looks like. Use the following Colab notebook with an example analyzing that sample profiling data:

Refer to this post for more details about how to interpret that file:

License

Copyright 2021 Israel Herraiz

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

   http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Owner
Israel Herraiz
Strategic Cloud Engineer @GoogleCloudPlatform
Israel Herraiz
股票行情实时数据接口-A股,完全免费的沪深证券股票数据-中国股市,python最简封装的API接口

股票行情实时数据接口-A股,完全免费的沪深证券股票数据-中国股市,python最简封装的API接口,包含日线,历史K线,分时线,分钟线,全部实时采集,系统包括新浪腾讯双数据核心采集获取,自动故障切换,STOCK数据格式成DataFrame格式,可用来查询研究量化分析,股票程序自动化交易系统.为量化研究者在数据获取方面极大地减轻工作量,更加专注于策略和模型的研究与实现。

dev 572 Jan 08, 2023
Automatization of BoxPlot graph usin Python MatPlotLib and Excel

BoxPlotGraphAutomation Automatization of BoxPlot graph usin Python / Excel. This file is an automation of BoxPlot-Graph using python graph library mat

EricAugustin 1 Feb 07, 2022
Create a visualization for Trump's Tweeted Words Using Python

Data Trump's Tweeted Words This plot illustrates twitter word occurences. We already did the coding I needed for this plot, so I was very inspired to

7 Mar 27, 2022
Application for viewing pokemon regional variants.

Pokemon Regional Variants Application Application for viewing pokemon regional variants. Run The Source Code Download Python https://www.python.org/do

Michael J Bailey 4 Oct 08, 2021
Visualizations for machine learning datasets

Introduction The facets project contains two visualizations for understanding and analyzing machine learning datasets: Facets Overview and Facets Dive

PAIR code 7.1k Jan 07, 2023
100 Days of Code The Complete Python Pro Bootcamp for 2022

100-Day-With-Python 100 Days of Code - The Complete Python Pro Bootcamp for 2022. In this course, I spend with python language over 100 days, and I up

Rajdip Das 8 Jun 22, 2022
Lightweight, extensible data validation library for Python

Cerberus Cerberus is a lightweight and extensible data validation library for Python. v = Validator({'name': {'type': 'string'}}) v.validate({

eve 2.9k Dec 27, 2022
Python histogram library - histograms as updateable, fully semantic objects with visualization tools. [P]ython [HYST]ograms.

physt P(i/y)thon h(i/y)stograms. Inspired (and based on) numpy.histogram, but designed for humans(TM) on steroids(TM). The goal is to unify different

Jan Pipek 120 Dec 08, 2022
Python implementation of the Density Line Chart by Moritz & Fisher.

PyDLC - Density Line Charts with Python Python implementation of the Density Line Chart (Moritz & Fisher, 2018) to visualize large collections of time

Charles L. Bérubé 10 Jan 06, 2023
Visual Python is a GUI-based Python code generator, developed on the Jupyter Notebook environment as an extension.

Visual Python is a GUI-based Python code generator, developed on the Jupyter Notebook environment as an extension.

Visual Python 564 Jan 03, 2023
Uniform Manifold Approximation and Projection

UMAP Uniform Manifold Approximation and Projection (UMAP) is a dimension reduction technique that can be used for visualisation similarly to t-SNE, bu

Leland McInnes 6k Jan 08, 2023
3D Vision functions with end-to-end support for deep learning developers, written in Ivy.

Ivy vision focuses predominantly on 3D vision, with functions for camera geometry, image projections, co-ordinate frame transformations, forward warping, inverse warping, optical flow, depth triangul

Ivy 61 Dec 29, 2022
Political elections, appointment, analysis and visualization in Python

Political elections, appointment, analysis and visualization in Python poli-sci-kit is a Python package for political science appointment and election

Andrew Tavis McAllister 9 Dec 01, 2022
Small project to recursively calculate and plot each successive order of the Hilbert Curve

hilbert-curve Small project to recursively calculate and plot each successive order of the Hilbert Curve. After watching 3Blue1Brown's video on Hilber

Stefan Mejlgaard 2 Nov 15, 2021
A minimalistic wrapper around PyOpenGL to save development time

glpy glpy is pyOpenGl wrapper which lets you work with pyOpenGl easily.It is not meant to be a replacement for pyOpenGl but runs on top of pyOpenGl to

Abhinav 9 Apr 02, 2022
A GUI for Pandas DataFrames

PandasGUI A GUI for analyzing Pandas DataFrames. Demo Installation Install latest release from PyPi: pip install pandasgui Install directly from Githu

Adam 2.8k Jan 03, 2023
Python package for the analysis and visualisation of finite-difference fields.

discretisedfield Marijan Beg1,2, Martin Lang2, Samuel Holt3, Ryan A. Pepper4, Hans Fangohr2,5,6 1 Department of Earth Science and Engineering, Imperia

ubermag 12 Dec 14, 2022
This is Pygrr PolyArt, a program used for drawing custom Polygon models for your Pygrr project!

This is Pygrr PolyArt, a program used for drawing custom Polygon models for your Pygrr project!

Isaac 4 Dec 14, 2021
Python scripts to manage Chia plots and drive space, providing full reports. Also monitors the number of chia coins you have.

Chia Plot, Drive Manager & Coin Monitor (V0.5 - April 20th, 2021) Multi Server Chia Plot and Drive Management Solution Be sure to ⭐ my repo so you can

338 Nov 25, 2022
cqMore is a CadQuery plugin based on CadQuery 2.1.

cqMore (under construction) cqMore is a CadQuery plugin based on CadQuery 2.1. Installation Please use conda to install CadQuery and its dependencies

Justin Lin 36 Dec 21, 2022