Esse é o meu primeiro repo tratando de fim a fim, uma pipeline de dados abertos do governo brasileiro relacionado a compras de contrato e cronogramas anuais com spark, em pyspark e SQL!

Overview

Olá!

Esse é o meu primeiro repo tratando de fim a fim, uma pipeline de dados abertos do governo brasileiro relacionado a compras de contrato e cronogramas anuais com spark, em pyspark e SQL!

O código se encontra aqui e o dado pode ser obtido por meio desse link

from pyspark.sql import SparkSession

##################################################### VARIABLES #####################################################

PATH_LANDING_ZONE_CSV = '../datalake/landing/comprasnet-contratos-anual-cronogramas-latest.csv'
PATH_PROCESSING_ZONE = '../datalake/processing'
PATH_CURATED_ZONE = '../datalake/curated'

##################################################### QUERY #########################################################

QUERY = """ 

WITH tmp as (
  SELECT 
    cast(id as integer) as id,
    cast(contrato_id as integer) as contrato_id,
    tipo,
    numero,
    receita_despesa,
    observacao,
    mesref,
    anoref,
    cast(vencimento as date) as vencimento,
    retroativo,
    cast(valor as decimal (10,2)) as valor,
    year(vencimento) as year,
    month(vencimento) as month,
    dayofmonth(vencimento) as day
  FROM 
    df
)
SELECT
  *
FROM 
  tmp
WHERE   
  year = 2021 OR 
  year = 2022
ORDER BY
  year desc

"""

##################################################### SCRIPT #########################################################

def csv_to_parquet(spark, path_csv, path_parquet):
  df = spark.read.option('header', True).csv(path_csv)
  return df.write.mode('overwrite').format('parquet').save(path_parquet)

def create_view(spark, path_parquet):
  df = spark.read.parquet(path_parquet) 
  df.createOrReplaceTempView('df')

def write_curated(spark, path_curated):
 
  df2 = spark.sql(QUERY)
    
  (
      df2
      .orderBy('year', ascending=False)
      .orderBy('month', ascending=False)
      .orderBy('day', ascending=False)
      .write.partitionBy('year','month','day')
      .mode('overwrite')
      .format('parquet')
      .save(path_curated)
  )


if __name__ == "__main__":
  
  spark = (
    SparkSession.builder
    .master("local[*]")
    .getOrCreate()
  )

  spark.sparkContext.setLogLevel("ERROR")
  
  csv_to_parquet(spark, PATH_LANDING_ZONE_CSV, PATH_PROCESSING_ZONE)

  create_view(spark, PATH_PROCESSING_ZONE)
  
  write_curated(spark, PATH_CURATED_ZONE )
  • Basicamente, extraimos os dados para a zona landing, depois, escrevemos o mesmo dado em diferente formato na zona processing, no caso parquet, por se tratar de um formato otimizado e mais leve.
  • Após, criamos uma view do dado recém salvo na zona processing, já em parquet, que otimiza a leitura do spark, aplicamos uma query de transformação que enriquece o schema do dado e seleciona apenas os dados de 2021 e 2022, já pronto para ser consumido.
  • E por fim, escrevemos na zona curated o dado já tratado, enriquecido, particionado por ano, mês e dia e pronto para consumo.

Para rodar o script, basicamente você pode fazer no terminal:

spark-submit etl.py

Você também encontrará o mesmo código e ideia de ETL em notebooks, em versão pyspark ou spark-sql.

Espero que gostem!

Qualquer dúvida, entrar em contato pelo LinkedIn.

:)

Owner
Henrique de Paula
Games e tech!
Henrique de Paula
Skforecast is a python library that eases using scikit-learn regressors as multi-step forecasters

Skforecast is a python library that eases using scikit-learn regressors as multi-step forecasters. It also works with any regressor compatible with the scikit-learn API (pipelines, CatBoost, LightGBM

Joaquín Amat Rodrigo 297 Jan 09, 2023
Scikit-learn compatible wrapper of the Random Bits Forest program written by (Wang et al., 2016)

sklearn-compatible Random Bits Forest Scikit-learn compatible wrapper of the Random Bits Forest program written by Wang et al., 2016, available as a b

Tamas Madl 8 Jul 24, 2021
Real-time stream processing for python

Streamz Streamz helps you build pipelines to manage continuous streams of data. It is simple to use in simple cases, but also supports complex pipelin

Python Streamz 1.1k Dec 28, 2022
Optuna is an automatic hyperparameter optimization software framework, particularly designed for machine learning

Optuna is an automatic hyperparameter optimization software framework, particularly designed for machine learning. It features an imperative, define-by-run style user API.

7.4k Jan 04, 2023
Nixtla is an open-source time series forecasting library.

Nixtla Nixtla is an open-source time series forecasting library. We are helping data scientists and developers to have access to open source state-of-

Nixtla 401 Jan 08, 2023
whylogs: A Data and Machine Learning Logging Standard

whylogs: A Data and Machine Learning Logging Standard whylogs is an open source standard for data and ML logging whylogs logging agent is the easiest

WhyLabs 2k Jan 06, 2023
Traingenerator 🧙 A web app to generate template code for machine learning ✨

Traingenerator 🧙 A web app to generate template code for machine learning ✨ 🎉 Traingenerator is now live! 🎉

Johannes Rieke 1.2k Jan 07, 2023
Anytime Learning At Macroscale

On Anytime Learning At Macroscale Learning from sequential data dumps (key) Requirements Python 3.7 Pytorch 1.9.0 Hydra 1.1.0 (pip install hydra-core

Meta Research 8 Mar 29, 2022
Backtesting an algorithmic trading strategy using Machine Learning and Sentiment Analysis.

Trading Tesla with Machine Learning and Sentiment Analysis An interactive program to train a Random Forest Classifier to predict Tesla daily prices us

Renato Votto 31 Nov 17, 2022
Python library which makes it possible to dynamically mask/anonymize data using JSON string or python dict rules in a PySpark environment.

pyspark-anonymizer Python library which makes it possible to dynamically mask/anonymize data using JSON string or python dict rules in a PySpark envir

6 Jun 30, 2022
This jupyter notebook project was completed by me and my friend using the dataset from Kaggle

ARM This jupyter notebook project was completed by me and my friend using the dataset from Kaggle. The world Happiness 2017, which ranks 155 countries

1 Jan 23, 2022
A simple example of ML classification, cross validation, and visualization of feature importances

Simple-Classifier This is a basic example of how to use several different libraries for classification and ensembling, mostly with sklearn. Example as

Rob 2 Aug 25, 2022
Magenta: Music and Art Generation with Machine Intelligence

Magenta is a research project exploring the role of machine learning in the process of creating art and music. Primarily this involves developing new

Magenta 18.1k Dec 30, 2022
As we all know the BGMI Loot Crate comes with so many resources for the gamers, this ML Crate will be the hub of various ML projects which will be the resources for the ML enthusiasts! Open Source Program: SWOC 2021 and JWOC 2022.

Machine Learning Loot Crate 💻 🧰 🔴 Welcome contributors! As we all know the BGMI Loot Crate comes with so many resources for the gamers, this ML Cra

Abhishek Sharma 89 Dec 28, 2022
Microsoft Machine Learning for Apache Spark

Microsoft Machine Learning for Apache Spark MMLSpark is an ecosystem of tools aimed towards expanding the distributed computing framework Apache Spark

Microsoft Azure 3.9k Dec 30, 2022
Dieses Projekt ermöglicht es den Smartmeter der EVN (Netz Niederösterreich) über die Kundenschnittstelle auszulesen.

SmartMeterEVN Dieses Projekt ermöglicht es den Smartmeter der EVN (Netz Niederösterreich) über die Kundenschnittstelle auszulesen. Smart Meter werden

greenMike 43 Dec 04, 2022
A Python library for choreographing your machine learning research.

A Python library for choreographing your machine learning research.

AI2 270 Jan 06, 2023
nn-Meter is a novel and efficient system to accurately predict the inference latency of DNN models on diverse edge devices

A DNN inference latency prediction toolkit for accurately modeling and predicting the latency on diverse edge devices.

Microsoft 241 Dec 26, 2022
Predict the income for each percentile of the population (Python) - FRENCH

05.income-prediction Predict the income for each percentile of the population (Python) - FRENCH Effectuez une prédiction de revenus Prérequis Pour ce

1 Feb 13, 2022
Machine Learning e Data Science com Python

Machine Learning e Data Science com Python Arquivos do curso de Data Science e Machine Learning com Python na Udemy, cliqe aqui para acessá-lo. O prin

Renan Barbosa 1 Jan 27, 2022