This repository contains a streaming Dataflow pipeline written in Python with Apache Beam, reading data from PubSub.

Overview

Sample streaming Dataflow pipeline written in Python

This repository contains a streaming Dataflow pipeline written in Python with Apache Beam, reading data from PubSub.

For more details, see the following Beam Summit 2021 talk:

To run this pipeline, you need to have the SDK installed, and a project in Google Cloud Platform, even if you run the pipeline locally with the direct runner:

Description of the pipeline

Data input

We are using here a public PubSub topic with data, so we don't need to setup our own to run this pipeline.

The topic is projects/pubsub-public-data/topics/taxirides-realtime.

That topic contains messages from the NYC Taxi Ride dataset. Here is a sample of the data contained in a message in that topic:

{
  "ride_id": "328bec4b-0126-42d4-9381-cb1dbf0e2432",
  "point_idx": 305,
  "latitude": 40.776270000000004,
  "longitude": -73.99111,
  "timestamp": "2020-03-27T21:32:51.48098-04:00",
  "meter_reading": 9.403651,
  "meter_increment": 0.030831642,
  "ride_status": "enroute",
  "passenger_count": 1
}

But the messages also contain metadata, that is useful for streaming pipelines. In this case, the messages contain an attribute of name ts, which contains the same timestamp as the field of name timestamp in the data. Remember that PubSub treats the data as just a string of bytes, so it does not know anything about the data itself. The metadata fields are normally used to publish messages with specific ids and/or timestamps.

To inspect the messages from this topic, you can create a subscription, and then pull some messages.

To create a subscription, use the gcloud cli utility (installed by default in the Cloud Shell):

export TOPIC=projects/pubsub-public-data/topics/taxirides-realtime
gcloud pubsub subscriptions create taxis --topic $TOPIC

To pull messages:

gcloud pubsub subscriptions pull taxis --limit 3

or if you have jq (for pretty printing of JSON)

gcloud pubsub subscriptions pull taxis --limit 3 | grep " {" | cut -f 2 -d ' ' | jq

Pay special attention to the Attributes column (metadata). You will see that the timestamp included as a field in the metadata, as well as in the data. We will leverage that metadata field for the timestamps used in our streaming pipeline.

Data output

This pipeline writes the output to BigQuery, in streaming append-only mode.

The destination tables must exist prior to running the pipeline.

If you have the GCloud cli utility installed (for instance, it is installed by default in the Cloud Shell), you can create the tables from the command line.

You need to create a BigQuery dataset too, in the same region:

After that, you can create the destination tables with the provided script

./scripts/create_tables.sh taxi_rides

Algorithm / business rules

We are using a session window with a gap of 10 seconds. That means that all the messages with the same ride_id will be grouped together, as long as their timestamps are 10 seconds within each other. Any message with a timestamp more than 10 seconds apart will be discarded (for old timestamps) or will open a new window (for newer timestamps).

With the messages inside each window (that is, each different ride_id will be part of a different window), we will calculate the duration of the session, as the difference between the min and max timestamps in the window. We will also calculate the number of events in that session.

We will use a GroupByKey to operate with all the messages in a window. This will load all the messages in the window into memory. This is fine, as in Beam streaming, a window is always processed in a worker (windows cannot be split across different workers).

This is an example of the kind of logic that can be implemented leveraging windows in streaming pipelines. This grouping of messages across ride_id and event timestamps is automatically done by the pipeline, and we just need to express the generic operations to be performed with each window, as part of our pipeline.

Running the pipeline

Prerequirements

You need to have a Google Cloud project, and the gcloud SDK configured to run the pipeline. For instance, you could run it from the Cloud Shell in Google Cloud Platform (gcloud would be automatically configured).

Then you need to create a Google Cloud Storage bucket, with the same name as your project id, and in the same region where you will run Dataflow:

Make sure that you have a Python environment with Python 3 (<3.9). For instance a virtualenv, and install apache-beam[gcp] and python-dateutil in your local environment. For instance, assuming that you are running in a virtualenv:

pip install "apache-beam[gcp]" python-dateutil

Run the pipeline

Once the tables are created and the dependencies installed, edit scripts/launch_dataflow_runner.sh and set your project id and region, and then run it with:

./scripts/launch_dataflow_runner.sh

The outputs will be written to the BigQuery tables, and in the profile directory in your bucket you should see Python gprof files with profiling information.

CPU profiling

Beam uses the Python profiler to produce files in Python gprof format. You will need some scripting to interpret those files and extracts insights out of them.

In this repository, you will find some sample output in data/beam.prof, that you can use to check what the profiling output looks like. Use the following Colab notebook with an example analyzing that sample profiling data:

Refer to this post for more details about how to interpret that file:

License

Copyright 2021 Israel Herraiz

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

   http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Owner
Israel Herraiz
Strategic Cloud Engineer @GoogleCloudPlatform
Israel Herraiz
A library for bridging Python and HTML/Javascript (via Svelte) for creating interactive visualizations

A library for bridging Python and HTML/Javascript (via Svelte) for creating interactive visualizations

Anthropic 98 Dec 27, 2022
With Holoviews, your data visualizes itself.

HoloViews Stop plotting your data - annotate your data and let it visualize itself. HoloViews is an open-source Python library designed to make data a

HoloViz 2.3k Jan 04, 2023
✅ Today I Learn

Today I Learn EDA numpy_100ex numpy_0~10 airline_satisfaction_prediction BERT_naver_movie_classification NLP_prepare NLP_Tweet_Emotion_Recognition tex

Yeonghoo_Ahn 3 Dec 15, 2022
The Spectral Diagram (SD) is a new tool for the comparison of time series in the frequency domain

The Spectral Diagram (SD) is a new tool for the comparison of time series in the frequency domain. The SD provides a novel way to display the coherence function, power, amplitude, phase, and skill sc

Mabel 3 Oct 10, 2022
3D rendered visualization of the austrian monuments registry

Visualization of the Austrian Monuments Visualization of the monument landscape of the austrian monuments registry (Bundesdenkmalamt Denkmalverzeichni

Nikolai Janakiev 3 Oct 24, 2019
Cartopy - a cartographic python library with matplotlib support

Cartopy is a Python package designed to make drawing maps for data analysis and visualisation easy. Table of contents Overview Get in touch License an

1.2k Jan 01, 2023
Visualizations of linear algebra algorithms for people who want a deep understanding

Visualising algorithms on symmetric matrices Examples QR algorithm and LR algorithm Here, we have a GIF animation of an interactive visualisation of t

ogogmad 3 May 05, 2022
A GUI for Pandas DataFrames

About Demo Installation Usage Features More Info About PandasGUI is a GUI for viewing, plotting and analyzing Pandas DataFrames. Demo Installation Ins

Adam Rose 2.8k Dec 24, 2022
eoplatform is a Python package that aims to simplify Remote Sensing Earth Observation by providing actionable information on a wide swath of RS platforms and provide a simple API for downloading and visualizing RS imagery

An Earth Observation Platform Earth Observation made easy. Report Bug | Request Feature About eoplatform is a Python package that aims to simplify Rem

Matthew Tralka 4 Aug 11, 2022
A high performance implementation of HDBSCAN clustering. http://hdbscan.readthedocs.io/en/latest/

HDBSCAN Now a part of scikit-learn-contrib HDBSCAN - Hierarchical Density-Based Spatial Clustering of Applications with Noise. Performs DBSCAN over va

Leland McInnes 91 Dec 29, 2022
Plotting library for IPython/Jupyter notebooks

bqplot 2-D plotting library for Project Jupyter Introduction bqplot is a 2-D visualization system for Jupyter, based on the constructs of the Grammar

3.4k Dec 29, 2022
Generate "Jupiter" plots for circular genomes

jupiter Generate "Jupiter" plots for circular genomes Description Python scripts to generate plots from ViennaRNA output. Written in "pidgin" python w

Robert Edgar 2 Nov 29, 2021
In-memory Graph Database and Knowledge Graph with Natural Language Interface, compatible with Pandas

CogniPy for Pandas - In-memory Graph Database and Knowledge Graph with Natural Language Interface Whats in the box Reasoning, exploration of RDF/OWL,

Cognitum Octopus 34 Dec 13, 2022
A shimmer pre-load component for Plotly Dash

dash-loading-shimmer A shimmer pre-load component for Plotly Dash Installation Get it with pip: pip install dash-loading-extras Or maybe you prefer Pi

Lucas Durand 4 Oct 12, 2022
Simple addon for snapping active object to mesh ground

Snap to Ground Simple addon for snapping active object to mesh ground How to install: install the Python file as an addon use shortcut "D" in 3D view

Iyad Ahmed 12 Nov 07, 2022
Using SQLite within Python to create database and analyze Starcraft 2 units data (Pandas also used)

SQLite python Starcraft 2 English This project shows the usage of SQLite with python. To create, modify and communicate with the SQLite database from

1 Dec 30, 2021
Simple function to plot multiple barplots in the same figure.

Simple function to plot multiple barplots in the same figure. Supports padding and custom color.

Matthias Jakobs 2 Feb 21, 2022
Python scripts to manage Chia plots and drive space, providing full reports. Also monitors the number of chia coins you have.

Chia Plot, Drive Manager & Coin Monitor (V0.5 - April 20th, 2021) Multi Server Chia Plot and Drive Management Solution Be sure to ⭐ my repo so you can

338 Nov 25, 2022
A visualization tool made in Pygame for various pathfinding algorithms.

Pathfinding-Visualizer 🚀 A visualization tool made in Pygame for various pathfinding algorithms. Pathfinding is closely related to the shortest path

Aysha sana 7 Jul 09, 2022
LabGraph is a a Python-first framework used to build sophisticated research systems with real-time streaming, graph API, and parallelism.

LabGraph is a a Python-first framework used to build sophisticated research systems with real-time streaming, graph API, and parallelism.

MLH Fellowship 7 Oct 05, 2022