Esse é o meu primeiro repo tratando de fim a fim, uma pipeline de dados abertos do governo brasileiro relacionado a compras de contrato e cronogramas anuais com spark, em pyspark e SQL!

Overview

Olá!

Esse é o meu primeiro repo tratando de fim a fim, uma pipeline de dados abertos do governo brasileiro relacionado a compras de contrato e cronogramas anuais com spark, em pyspark e SQL!

O código se encontra aqui e o dado pode ser obtido por meio desse link

from pyspark.sql import SparkSession

##################################################### VARIABLES #####################################################

PATH_LANDING_ZONE_CSV = '../datalake/landing/comprasnet-contratos-anual-cronogramas-latest.csv'
PATH_PROCESSING_ZONE = '../datalake/processing'
PATH_CURATED_ZONE = '../datalake/curated'

##################################################### QUERY #########################################################

QUERY = """ 

WITH tmp as (
  SELECT 
    cast(id as integer) as id,
    cast(contrato_id as integer) as contrato_id,
    tipo,
    numero,
    receita_despesa,
    observacao,
    mesref,
    anoref,
    cast(vencimento as date) as vencimento,
    retroativo,
    cast(valor as decimal (10,2)) as valor,
    year(vencimento) as year,
    month(vencimento) as month,
    dayofmonth(vencimento) as day
  FROM 
    df
)
SELECT
  *
FROM 
  tmp
WHERE   
  year = 2021 OR 
  year = 2022
ORDER BY
  year desc

"""

##################################################### SCRIPT #########################################################

def csv_to_parquet(spark, path_csv, path_parquet):
  df = spark.read.option('header', True).csv(path_csv)
  return df.write.mode('overwrite').format('parquet').save(path_parquet)

def create_view(spark, path_parquet):
  df = spark.read.parquet(path_parquet) 
  df.createOrReplaceTempView('df')

def write_curated(spark, path_curated):
 
  df2 = spark.sql(QUERY)
    
  (
      df2
      .orderBy('year', ascending=False)
      .orderBy('month', ascending=False)
      .orderBy('day', ascending=False)
      .write.partitionBy('year','month','day')
      .mode('overwrite')
      .format('parquet')
      .save(path_curated)
  )


if __name__ == "__main__":
  
  spark = (
    SparkSession.builder
    .master("local[*]")
    .getOrCreate()
  )

  spark.sparkContext.setLogLevel("ERROR")
  
  csv_to_parquet(spark, PATH_LANDING_ZONE_CSV, PATH_PROCESSING_ZONE)

  create_view(spark, PATH_PROCESSING_ZONE)
  
  write_curated(spark, PATH_CURATED_ZONE )
  • Basicamente, extraimos os dados para a zona landing, depois, escrevemos o mesmo dado em diferente formato na zona processing, no caso parquet, por se tratar de um formato otimizado e mais leve.
  • Após, criamos uma view do dado recém salvo na zona processing, já em parquet, que otimiza a leitura do spark, aplicamos uma query de transformação que enriquece o schema do dado e seleciona apenas os dados de 2021 e 2022, já pronto para ser consumido.
  • E por fim, escrevemos na zona curated o dado já tratado, enriquecido, particionado por ano, mês e dia e pronto para consumo.

Para rodar o script, basicamente você pode fazer no terminal:

spark-submit etl.py

Você também encontrará o mesmo código e ideia de ETL em notebooks, em versão pyspark ou spark-sql.

Espero que gostem!

Qualquer dúvida, entrar em contato pelo LinkedIn.

:)

Owner
Henrique de Paula
Games e tech!
Henrique de Paula
Test symmetries with sklearn decision tree models

Test symmetries with sklearn decision tree models Setup Begin from an environment with a recent version of python 3. source setup.sh Leave the enviro

Rupert Tombs 2 Jul 19, 2022
A toolkit for geo ML data processing and model evaluation (fork of solaris)

An open source ML toolkit for overhead imagery. This is a beta version of lunular which may continue to develop. Please report any bugs through issues

Ryan Avery 4 Nov 04, 2021
Contains an implementation (sklearn API) of the algorithm proposed in "GENDIS: GEnetic DIscovery of Shapelets" and code to reproduce all experiments.

GENDIS GENetic DIscovery of Shapelets In the time series classification domain, shapelets are small subseries that are discriminative for a certain cl

IDLab Services 90 Oct 28, 2022
MLFlow in a Dockercontainer based on Azurite and Postgres

mlflow-azurite-postgres docker This is a MLFLow image which works with a postgres DB and a local Azure Blob Storage Instance (Azurite). This image is

2 May 29, 2022
Python/Sage Tool for deriving Scattering Matrices for WDF R-Adaptors

R-Solver A Python tools for deriving R-Type adaptors for Wave Digital Filters. This code is not quite production-ready. If you are interested in contr

8 Sep 19, 2022
Python Machine Learning Jupyter Notebooks (ML website)

Python Machine Learning Jupyter Notebooks (ML website) Dr. Tirthajyoti Sarkar, Fremont, California (Please feel free to connect on LinkedIn here) Also

Tirthajyoti Sarkar 2.6k Jan 03, 2023
Machine Learning University: Accelerated Natural Language Processing Class

Machine Learning University: Accelerated Natural Language Processing Class This repository contains slides, notebooks and datasets for the Machine Lea

AWS Samples 2k Jan 01, 2023
Implementation of linesearch Optimization Algorithms in Python

Nonlinear Optimization Algorithms During my time as Scientific Assistant at the Karlsruhe Institute of Technology (Germany) I implemented various Opti

Paul 3 Dec 06, 2022
Firebase + Cloudrun + Machine learning

A simple end to end consumer lending decision engine powered by Google Cloud Platform (firebase hosting and cloudrun)

Emmanuel Ogunwede 8 Aug 16, 2022
A fast, distributed, high performance gradient boosting (GBT, GBDT, GBRT, GBM or MART) framework based on decision tree algorithms, used for ranking, classification and many other machine learning tasks.

Light Gradient Boosting Machine LightGBM is a gradient boosting framework that uses tree based learning algorithms. It is designed to be distributed a

Microsoft 14.5k Jan 07, 2023
Graphsignal is a machine learning model monitoring platform.

Graphsignal is a machine learning model monitoring platform. It helps ML engineers, MLOps teams and data scientists to quickly address issues with data and models as well as proactively analyze model

Graphsignal 143 Dec 05, 2022
A modular active learning framework for Python

Modular Active Learning framework for Python3 Page contents Introduction Active learning from bird's-eye view modAL in action From zero to one in a fe

modAL 1.9k Dec 31, 2022
Simple structured learning framework for python

PyStruct PyStruct aims at being an easy-to-use structured learning and prediction library. Currently it implements only max-margin methods and a perce

pystruct 666 Jan 03, 2023
Flightfare-Prediction - It is a Flightfare Prediction Web Application Using Machine learning,Python and flask

Flight_fare-Prediction It is a Flight_fare Prediction Web Application Using Machine learning,Python and flask Using Machine leaning i have created a F

1 Dec 06, 2022
Kaggle Competition using 15 numerical predictors to predict a continuous outcome.

Kaggle-Comp.-Data-Mining Kaggle Competition using 15 numerical predictors to predict a continuous outcome as part of a final project for a stats data

moisey alaev 1 Dec 28, 2021
scikit-fem is a lightweight Python 3.7+ library for performing finite element assembly.

scikit-fem is a lightweight Python 3.7+ library for performing finite element assembly. Its main purpose is the transformation of bilinear forms into sparse matrices and linear forms into vectors.

Tom Gustafsson 297 Dec 13, 2022
Generate music from midi files using BPE and markov model

Generate music from midi files using BPE and markov model

Aditya Khadilkar 37 Oct 24, 2022
Programming assignments and quizzes from all courses within the Machine Learning Engineering for Production (MLOps) specialization offered by deeplearning.ai

Machine Learning Engineering for Production (MLOps) Specialization on Coursera (offered by deeplearning.ai) Programming assignments from all courses i

Aman Chadha 173 Jan 05, 2023
A simple application that calculates the probability distribution of a normal distribution

probability-density-function General info An application that calculates the probability density and cumulative distribution of a normal distribution

1 Oct 25, 2022
Visualize classified time series data with interactive Sankey plots in Google Earth Engine

sankee Visualize changes in classified time series data with interactive Sankey plots in Google Earth Engine Contents Description Installation Using P

Aaron Zuspan 76 Dec 15, 2022