Building house price data pipelines with Apache Beam and Spark on GCP

Overview

house-price-etl-pipeline

This project contains the process from building a web crawler to extract the raw data of house price to create ETL pipelines using Google Could Platform services.

Basic flow of the ETL pipeline

The ETL pipelines are built with both Apache Beam using Cloud Dataflow and Spark using Cloud Dataproc for loading real estate transactions data into BigQuery, and the data can be visualized in Data Studio. The project also uses Cloud Function to monitor if a new file is uploaded in the GCS bucket and trigger the pipeline automatically.

1. Get Started

The house price data

Actual price registration of real estate transactions data in Taiwan has been released since 2012, which refers to the transaction information includes: position and area of real estate, total price of land and building, parking space related information, etc. We can use the data to observe the changes in house prices over time or predict the house price trend in various regions.

Setup and requirements

Set up on Google Cloud Platform:

Project is created with:

  • Python version: 3.7
  • Apache beam version: 2.33.0
  • Pyspark version: 3.2.0

2. Use a web crawler to download the historical data

Run the web crawler to download historical actual price data in csv format, and upload the files to the Google Cloud Storage bucket.

First, set up the local Python development environment and install packages from requirements.txt:

$ pip install -r requirements.txt

Open crawler.py file, replace YOUR_DIR_PATH with a local directory to store download data, replace projectID with your Google Cloud project ID, and replace GCS_BUCKET_NAME with the name of your Cloud Storage bucket. Then run the web crawler:

$ python crawler.py

3. Build ETL pipelines on GCP

There are two versions of ETL pipelines that read source files from Cloud Storage, apply some transformations and load the data into BigQuery. One of the ETL pipelines based on Apache beam uses Dataflow to process the data for analytics of land transaction. The other ETL pipeline based on Apache Spark uses Dataproc to proccess the data for analytics of building transaction.

Let’s start by opening a session in Google Cloud Shell. Run the following commands to set the project property with your project ID.

$ gcloud config set project [projectID]

Run the pipeline using Dataflow for land data

The file etl_pipeline_beam.py contains the Python code for the etl pipeline with Apache beam. We can upload the file using the Cloud Shell Editor.

Run actual_price_etl.py to create a Dataflow job which runs the DataflowRunner. Notice that we need to set the Cloud Storage location of the staging and template file, and set the region in which the created job should run.

$ python etl_pipeline_beam.py \
--project=projectID \
--region=region \
--runner=DataflowRunner \
--staging_location=gs://BUCKET_NAME/staging \
--temp_location=gs://BUCKET_NAME/temp \
--save_main_session

Run the pipeline using Dataproc for building data

The file etl_pipeline_spark.py contains the Python code for the etl pipeline with Apache Spark. We can upload the file using the Cloud Shell Editor.

Submit etl_pipeline_spark.py to your Dataproc cluster to run the Spark job. We need to set the cluster name, and set the region in which the created job should run. To write data to Bigquery, the jar file of spark-bigquery-connector must be available at runtime.

$ gcloud dataproc jobs submit pyspark etl_pipeline_spark.py \
--cluster=cluster-name \
--region=region \
--jars=gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar

4. Use a Cloud Function to trigger Cloud Dataflow

Use the Cloud Fucntion to automatically trigger the Dataflow pipeline when a new file arrives in the GCS bucket.

First, we need to create a Dataflow template for runnig the data pipeline with REST API request called by the Cloud Function. The file etl_pipeline_beam_auto.py contains the Python code for the etl pipeline with Apache beam. We can upload the file using the Cloud Shell Editor.

Create a Dataflow template

Use etl_pipeline_beam_auto.py to create a Dataflow template. Note that we need to set the Cloud Storage location of the staging, temporary and template file, and set the region in which the created job should run.

python -m etl_pipeline_beam_auto \
    --runner DataflowRunner \
    --project projectID \
    --region=region \
    --staging_location gs://BUCKET_NAME/staging \
    --temp_location gs://BUCKET_NAME/temp \
    --template_location gs://BUCKET_NAME/template \
    --save_main_session

Create a Cloud Function

Go to the Cloud Function GUI and manually create a function, set Trigger as Cloud Storage, Event Type as Finalize/Create , and choose the GCS bucket which needs to be monitored. Next, write the function itself, use the code in main.py file. Note that the user defined parameter input is passed to the Dataflow pipeline job. Finally, click on depoly and now your function is ready to execute and start the Dataflow pipeline when a file is uploaded in your bucket.

Results

When each ETL pipeline is completed and succeeded, navigating to BigQuery to verify that the data is successfully loaded in the table.

BigQuery - land_data table

Now the data is ready for analytics and reporting. Here, we calculate average price by year in BigQuery, and visualize the results in Data Studio.

Data Studio - Average land price by year in Yilan County

AptaMat is a simple script which aims to measure differences between DNA or RNA secondary structures.

AptaMAT Purpose AptaMat is a simple script which aims to measure differences between DNA or RNA secondary structures. The method is based on the compa

GEC UTC 3 Nov 03, 2022
Data Analytics: Modeling and Studying data relating to climate change and adoption of electric vehicles

Correlation-Study-Climate-Change-EV-Adoption Data Analytics: Modeling and Studying data relating to climate change and adoption of electric vehicles I

Jonathan Feng 1 Jan 03, 2022
Fitting thermodynamic models with pycalphad

ESPEI ESPEI, or Extensible Self-optimizing Phase Equilibria Infrastructure, is a tool for thermodynamic database development within the CALPHAD method

Phases Research Lab 42 Sep 12, 2022
A probabilistic programming library for Bayesian deep learning, generative models, based on Tensorflow

ZhuSuan is a Python probabilistic programming library for Bayesian deep learning, which conjoins the complimentary advantages of Bayesian methods and

Tsinghua Machine Learning Group 2.2k Dec 28, 2022
DaDRA (day-druh) is a Python library for Data-Driven Reachability Analysis.

DaDRA (day-druh) is a Python library for Data-Driven Reachability Analysis. The main goal of the package is to accelerate the process of computing estimates of forward reachable sets for nonlinear dy

2 Nov 08, 2021
Pandas and Dask test helper methods with beautiful error messages.

beavis Pandas and Dask test helper methods with beautiful error messages. test helpers These test helper methods are meant to be used in test suites.

Matthew Powers 18 Nov 28, 2022
Python reader for Linked Data in HDF5 files

Linked Data are becoming more popular for user-created metadata in HDF5 files.

The HDF Group 8 May 17, 2022
Data pipelines built with polars

valves Warning: the project is very much work in progress. Valves is a collection of functions for your data .pipe()-lines. This project aimes to host

14 Jan 03, 2023
This repository contains some analysis of possible nerdle answers

Nerdle Analysis https://nerdlegame.com/ This repository contains some analysis of possible nerdle answers. Here's a quick overview: nerdle.py contains

0 Dec 16, 2022
pyhsmm MITpyhsmm - Bayesian inference in HSMMs and HMMs. MIT

Bayesian inference in HSMMs and HMMs This is a Python library for approximate unsupervised inference in Bayesian Hidden Markov Models (HMMs) and expli

Matthew Johnson 527 Dec 04, 2022
Pandas and Spark DataFrame comparison for humans

DataComPy DataComPy is a package to compare two Pandas DataFrames. Originally started to be something of a replacement for SAS's PROC COMPARE for Pand

Capital One 259 Dec 24, 2022
Exploratory Data Analysis of the 2019 Indian General Elections using a dataset from Kaggle.

2019-indian-election-eda Exploratory Data Analysis of the 2019 Indian General Elections using a dataset from Kaggle. This project is a part of the Cou

Souradeep Banerjee 5 Oct 10, 2022
Data Scientist in Simple Stock Analysis of PT Bukalapak.com Tbk for Long Term Investment

Data Scientist in Simple Stock Analysis of PT Bukalapak.com Tbk for Long Term Investment Brief explanation of PT Bukalapak.com Tbk Bukalapak was found

Najibulloh Asror 2 Feb 10, 2022
Data Intelligence Applications - Online Product Advertising and Pricing with Context Generation

Data Intelligence Applications - Online Product Advertising and Pricing with Context Generation Overview Consider the scenario in which advertisement

Manuel Bressan 2 Nov 18, 2021
Flexible HDF5 saving/loading and other data science tools from the University of Chicago

deepdish Flexible HDF5 saving/loading and other data science tools from the University of Chicago. This repository also host a Deep Learning blog: htt

UChicago - Department of Computer Science 255 Dec 10, 2022
ASTR 302: Python for Astronomy (Winter '22)

ASTR 302, Winter 2022, University of Washington: Python for Astronomy Mario Jurić Location When: 2:30-3:50, Monday & Wednesday, Winter quarter 2022 Wh

UW ASTR 302: Python for Astronomy 4 Jan 12, 2022
WaveFake: A Data Set to Facilitate Audio DeepFake Detection

WaveFake: A Data Set to Facilitate Audio DeepFake Detection This is the code repository for our NeurIPS 2021 (Track on Datasets and Benchmarks) paper

Chair for Sys­tems Se­cu­ri­ty 27 Dec 22, 2022
Analyzing Earth Observation (EO) data is complex and solutions often require custom tailored algorithms.

eo-grow Earth observation framework for scaled-up processing in Python. Analyzing Earth Observation (EO) data is complex and solutions often require c

Sentinel Hub 18 Dec 23, 2022
Open-source Laplacian Eigenmaps for dimensionality reduction of large data in python.

Fast Laplacian Eigenmaps in python Open-source Laplacian Eigenmaps for dimensionality reduction of large data in python. Comes with an wrapper for NMS

17 Jul 09, 2022